libdsproc3  2.0
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
dsproc_dataset_filters.c
Go to the documentation of this file.
1 /*******************************************************************************
2 *
3 * COPYRIGHT (C) 2012 Battelle Memorial Institute. All Rights Reserved.
4 *
5 ********************************************************************************
6 *
7 * Author:
8 * name: Brian Ermold
9 * phone: (509) 375-2277
10 * email: brian.ermold@pnl.gov
11 *
12 ********************************************************************************
13 *
14 * REPOSITORY INFORMATION:
15 * $Revision: 63190 $
16 * $Author: ermold $
17 * $Date: 2015-08-12 18:09:37 +0000 (Wed, 12 Aug 2015) $
18 *
19 ********************************************************************************
20 *
21 * NOTE: DOXYGEN is used to generate documentation for this file.
22 *
23 *******************************************************************************/
24 
25 /** @file dsproc_dataset_filters.c
26  * Dataset Filtering Functions.
27  */
28 
29 #include <string.h>
30 #include <math.h>
31 
32 #include "dsproc3.h"
33 #include "dsproc_private.h"
34 
35 extern DSProc *_DSProc; /**< Internal DSProc structure */
36 
37 /** @privatesection */
38 
39 /*******************************************************************************
40  * Static Data and Functions Visible Only To This Module
41  */
42 
43 /** Flag used to disable the NaN filtering warnings. */
45 
46 /** Flag used to allow overlapping records to be filtered. */
48 
49 /*******************************************************************************
50  * Private Functions Visible Only To This Library
51  */
52 
53 /**
54  * Compare all samples in dataset1 with the samples in dataset2.
55  *
56  * This function assumes that the time values have already been compared.
57  *
58  * @param dataset1 - pointer to dataset 1
59  * @param start1 - start sample in dataset 1
60  * @param dataset2 - pointer to dataset 2
61  * @param start2 - start sample in dataset 2
62  * @param count - number of samples to compare
63  *
64  * @return
65  * - 1 if all samples have identical data values
66  * - 0 if differences were found
67  */
68 int _dsproc_compare_samples(
69  CDSGroup *dataset1,
70  size_t start1,
71  CDSGroup *dataset2,
72  size_t start2,
73  size_t count)
74 {
75  CDSDim *time_dim1 = cds_get_dim(dataset1, "time");
76  CDSDim *time_dim2 = cds_get_dim(dataset2, "time");
77  CDSVar *var1;
78  CDSVar *var2;
79  size_t var1_count;
80  size_t var2_count;
81  void *data1;
82  void *data2;
83  size_t sample_size;
84  size_t nbytes;
85  int vi;
86 
87  /* Make sure both datasets have a time dimension */
88 
89  if (!time_dim1 || !time_dim2) {
90  if (time_dim1 || time_dim2) {
91  return(0);
92  }
93  return(1);
94  }
95 
96  /* Loop over all variables in dataset1 */
97 
98  for (vi = 0; vi < dataset1->nvars; vi++) {
99 
100  /* Check if this variable has the time dimension
101  * and data for the samples to compare */
102 
103  var1 = dataset1->vars[vi];
104  if ((var1->dims[0] != time_dim1) ||
105  (var1->sample_count <= start1)) {
106 
107  continue;
108  }
109 
110  /* Skip the time variables */
111 
112  if ((strcmp(var1->name, "time") == 0) ||
113  (strcmp(var1->name, "time_offset") == 0)) {
114 
115  continue;
116  }
117 
118  /* Make sure dataset2 has this variable */
119 
120  var2 = cds_get_var(dataset2, var1->name);
121  if (!var2) return(0);
122 
123  /* Make sure the variable in dataset2 has the time dimension
124  * and data for the samples to compare */
125 
126  if ((var2->dims[0] != time_dim2) ||
127  (var2->sample_count <= start2)) {
128 
129  return(0);
130  }
131 
132  /* Make sure the number of samples to compare from dataset1
133  * is less than or equal to the number of samples that exist
134  * in dataset2. */
135 
136  var1_count = var1->sample_count - start1;
137  if (var1_count > count) var1_count = count;
138 
139  var2_count = var2->sample_count - start2;
140  if (var2_count > count) var2_count = count;
141 
142  if (var1_count > var2_count) return(0);
143 
144  /* Make sure the data types match */
145 
146  if (var1->type != var2->type) return(0);
147 
148  /* Make sure the sample sizes match */
149 
150  sample_size = cds_var_sample_size(var1);
151  if (sample_size != cds_var_sample_size(var2)) return(0);
152 
153  if (!sample_size) continue;
154 
155  /* Compare the data values */
156 
157  nbytes = sample_size * cds_data_type_size(var1->type);
158 
159  data1 = var1->data.bp + (start1 * nbytes);
160  data2 = var2->data.bp + (start2 * nbytes);
161 
162  if (memcmp(data1, data2, var1_count * nbytes) != 0) return(0);
163 
164  } /* end loop over variables in dataset1 */
165 
166  return(1);
167 }
168 
169 /**
170  * Remove samples from a dataset.
171  *
172  * @param ntimes - input/output: number of times in the dataset
173  * @param times - input/output: array of times in the dataset
174  * @param mask - array of flags indicating the samples to remove
175  * @param dataset - pointer to the dataset
176  */
177 void _dsproc_delete_samples(
178  size_t *ntimes,
179  timeval_t *times,
180  int *mask,
181  CDSGroup *dataset)
182 {
183  CDSDim *time_dim = cds_get_dim(dataset, "time");
184  CDSVar *var;
185  size_t nsamples;
186  size_t nbytes;
187  void *data1;
188  void *data2;
189  timeval_t *time1;
190  timeval_t *time2;
191  int vi;
192  size_t ti;
193 
194  /* Delete the flagged samples */
195 
196  for (vi = 0; vi < dataset->nvars; vi++) {
197 
198  /* Check if this variable has the time dimension
199  * and data defined for it */
200 
201  var = dataset->vars[vi];
202  if ((var->dims[0] != time_dim) ||
203  (var->sample_count == 0)) {
204 
205  continue;
206  }
207 
208  /* Delete the flagged samples */
209 
210  nbytes = cds_var_sample_size(var) * cds_data_type_size(var->type);
211  if (nbytes == 0) continue;
212 
213  data1 = var->data.bp;
214  data2 = var->data.bp;
215  nsamples = (var->sample_count < *ntimes) ? var->sample_count : *ntimes;
216 
217  for (ti = 0; ti < nsamples; ti++) {
218 
219  if (mask[ti]) {
220  var->sample_count -= 1;
221  }
222  else {
223 
224  if (data1 != data2) {
225  memcpy(data1, data2, nbytes);
226  }
227 
228  data1 += nbytes;
229  }
230 
231  data2 += nbytes;
232  }
233 
234  } /* end loop over variables in dataset1 */
235 
236  /* Delete the flagged times */
237 
238  time1 = times;
239  time2 = times;
240 
241  nsamples = 0;
242 
243  for (ti = 0; ti < *ntimes; ti++) {
244 
245  if (!mask[ti]) {
246 
247  if (time1 != time2) {
248  *time1 = *time2;
249  }
250 
251  time1 += 1;
252  nsamples += 1;
253  }
254 
255  time2 += 1;
256  }
257 
258  time_dim->length = *ntimes = nsamples;
259 }
260 
261 /**
262  * Filter out duplicate samples from a dataset.
263  *
264  * This function will filter out samples in a dataset that have identical
265  * times and data values. It will also verify that the remaining samples
266  * are in chronological order.
267  *
268  * A warning mail message will be generated if any duplicate samples were
269  * found and removed.
270  *
271  * If an error occurs in this function it will be appended to the log and
272  * error mail messages, and the process status will be set appropriately.
273  *
274  * @param ntimes - input/output: number of times in the dataset
275  * @param times - input/output: array of times in the dataset
276  * @param dataset - input/output: pointer to the dataset
277  *
278  * @return
279  * - 1 if successful
280  * - 0 if an error occurred
281  */
282 int _dsproc_filter_duplicate_samples(
283  size_t *ntimes,
284  timeval_t *times,
285  CDSGroup *dataset)
286 {
287  Mail *warning_mail = msngr_get_mail(MSNGR_WARNING);
288  int force_mode = dsproc_get_force_mode();
289  char *errmsg = (char *)NULL;
290  const char *status = (char *)NULL;
291  int *filter_mask = (int *)NULL;
292  int overlap_type = 0;
293  size_t noverlaps = 0;
294  size_t ndups = 0;
295  size_t total_filtered = 0;
296  timeval_t time1, time2;
297  char ts1[32], ts2[32];
298  size_t mi, ti, tj, tii, tjj;
299 
301  "%s: Checking for overlapping samples in dataset\n",
302  dataset->name);
303 
304  time1 = times[0];
305  tii = 0;
306 
307  for (tj = 1; tj < *ntimes; ++tj) {
308 
309  /* Check if time1 < time2 */
310 
311  time2 = times[tj];
312 
313  if (TV_LT(time1, time2)) {
314  time1 = time2;
315  tii = tj;
316  continue;
317  }
318 
319  /* The times are not in chronological order,
320  * so search for the start index of the overlap... */
321 
322  for (ti = 0; ti < tj; ++ti) {
323  if (TV_GTEQ(times[ti], time2)) break;
324  }
325 
326  ndups = 0;
327  noverlaps = 0;
328 
329  if (TV_EQ(times[ti], time2)) {
330 
331  /* A time equal to time2 was found,
332  * so check for consecutive duplicate times */
333 
334  for (tii = ti+1, tjj = tj+1; tii < tj; ++tii, ++tjj) {
335  if (TV_NEQ(times[tii], times[tjj])) break;
336  }
337 
338  ndups = tjj - tj;
339  }
340  else if (gFilterOverlaps & FILTER_TIME_SHIFTS || force_mode) {
341 
342  /* Filter out overlapping records */
343 
344  for (tjj = tj+1; tjj < *ntimes; ++tjj) {
345  if (TV_GT(times[tjj], time1)) break;
346  }
347 
348  noverlaps = tjj - tj;
349  overlap_type = 1; // times do not match
350  }
351  else {
352 
353  /* If a time equal to time2 was not found, we have a section
354  * of overlapping records that do not have matching times. */
355 
356  format_timeval(&time1, ts1);
357  format_timeval(&time2, ts2);
358 
359  status = DSPROC_ETIMEORDER;
360  errmsg = msngr_create_string(
361  "%s: Invalid time order found in dataset\n"
362  " -> '%s' < '%s': time of record %d < time of previous record\n",
363  dataset->name, ts2, ts1, (int)tj);
364 
365  break;
366  }
367 
368  /* Check if we found records with duplicate timestamps */
369 
370  if (ndups) {
371 
372  /* Check if these are duplicate or overlapping records */
373 
374  if (!_dsproc_compare_samples(dataset, tj, dataset, ti, ndups)) {
375 
376  if (gFilterOverlaps & FILTER_OVERLAPS || force_mode) {
377  noverlaps = ndups;
378  overlap_type = 2; // times match but data values do not
379  ndups = 0;
380  }
381  else {
382 
383  /* Set status and error message */
384 
385  status = DSPROC_ETIMEOVERLAP;
386 
387  if (ndups == 1) {
388 
389  format_timeval(&times[tj], ts1);
390 
391  errmsg = msngr_create_string(
392  "%s: Overlapping records found in dataset\n"
393  " -> '%s': time of record %d = time of record %d\n",
394  dataset->name, ts1, (int)tj, (int)ti);
395  }
396  else {
397 
398  format_timeval(&times[tj], ts1);
399  format_timeval(&times[tjj-1], ts2);
400 
401  errmsg = msngr_create_string(
402  "%s: Overlapping records found in dataset\n"
403  " -> '%s' to '%s': records %d to %d overlap records %d to %d\n",
404  dataset->name,
405  ts1, ts2, (int)tj, (int)(tjj-1), (int)ti, (int)(tii-1));
406  }
407 
408  break;
409  }
410  }
411  }
412 
413  /* Check if this is the first set of records to be filtered */
414 
415  if (total_filtered == 0) {
416 
417  if (warning_mail) {
418  mail_unset_flags(warning_mail, MAIL_ADD_NEWLINE);
419  }
420 
421  if (gFilterOverlaps || force_mode) {
422 
424  "%s: Filtering overlapping records in dataset\n",
425  dataset->name);
426  }
427  else {
428 
430  "%s: Filtering duplicate records in dataset\n",
431  dataset->name);
432  }
433 
434  filter_mask = (int *)calloc(*ntimes, sizeof(int));
435  if (!filter_mask) {
436 
438  "Could not filter overlapping records from dataset: %s\n"
439  " -> memory allocation error\n",
440  dataset->name);
441 
443  return(0);
444  }
445  }
446 
447  /* Set the mask flags */
448 
449  for (mi = tj; mi < tjj; ++mi) {
450  filter_mask[mi] = 1;
451  }
452 
453  total_filtered += ndups + noverlaps;
454 
455  /* Print warning message */
456 
457  if (ndups) {
458 
459  format_timeval(&times[tj], ts1);
460 
461  if (ndups == 1) {
462 
464  " - '%s': record %d is identical to record %d\n",
465  ts1, (int)tj, (int)ti);
466  }
467  else {
468 
469  format_timeval(&times[tjj-1], ts2);
470 
472  " - '%s' to '%s': records %d to %d are identical to records %d to %d\n",
473  ts1, ts2, (int)tj, (int)(tjj-1), (int)ti, (int)(tii-1));
474  }
475 
476  }
477  else if (noverlaps) {
478 
479  format_timeval(&times[tj], ts1);
480 
481  if (noverlaps == 1) {
482 
483  if (overlap_type == 1) {
485  " - '%s': record %d overlaps previous records (invalid time order)\n",
486  ts1, (int)tj);
487  }
488  else {
490  " - '%s': record %d overlaps record %d (data values do not match)\n",
491  ts1, (int)tj, (int)ti);
492  }
493  }
494  else {
495 
496  format_timeval(&times[tjj-1], ts2);
497 
498  if (overlap_type == 1) {
500  " - '%s' to '%s': records %d to %d overlap previous records (invalid time order)\n",
501  ts1, ts2, (int)tj, (int)(tjj-1));
502  }
503  else {
505  " - '%s' to '%s': records %d to %d overlap records %d to %d (data values do not match)\n",
506  ts1, ts2, (int)tj, (int)(tjj-1), (int)ti, (int)(tii-1));
507  }
508  }
509  }
510 
511  tj = tjj - 1;
512 
513  } /* end loop over times */
514 
515  if (total_filtered) {
516 
517  if (warning_mail) {
518  mail_set_flags(warning_mail, MAIL_ADD_NEWLINE);
519  }
520 
521  if (errmsg) {
523  " - filtering aborted\n\n%s", errmsg);
524  }
525  else {
526 
527  _dsproc_delete_samples(ntimes, times, filter_mask, dataset);
528 
530  " - total records filtered: %d\n", total_filtered);
531  }
532 
533  free(filter_mask);
534  }
535 
536  if (errmsg) {
537  ERROR( DSPROC_LIB_NAME, "%s", errmsg);
538  dsproc_set_status(status);
539  free(errmsg);
540  return(0);
541  }
542 
543  return(1);
544 }
545 
546 /**
547  * Filter out previously stored samples from a dataset.
548  *
549  * This function will filter out samples in a dataset that have identical
550  * times and data values of previously stored data. It will also verify
551  * that the remaining samples do not overlap with any previously stored data.
552  *
553  * This function assumes that the times in the specified dataset are all
554  * in chronological order and that no sample times are duplicated.
555  *
556  * A warning mail message will be generated if any duplicate samples were
557  * found and removed.
558  *
559  * If an error occurs in this function it will be appended to the log and
560  * error mail messages, and the process status will be set appropriately.
561  *
562  * @param ds - pointer to the DataStream structure
563  * @param ntimes - input/output: number of times in the dataset
564  * @param times - input/output: array of times in the dataset
565  * @param dataset - input/output: pointer to the dataset
566  *
567  * @return
568  * - 1 if all remaining samples do not overlap any stored data
569  * - 0 if an error occurred
570  */
571 int _dsproc_filter_stored_samples(
572  DataStream *ds,
573  size_t *ntimes,
574  timeval_t *times,
575  CDSGroup *dataset)
576 {
577  Mail *warning_mail = msngr_get_mail(MSNGR_WARNING);
578  int force_mode = dsproc_get_force_mode();
579  char *errmsg = (char *)NULL;
580  const char *status = (char *)NULL;
581  int *filter_mask = (int *)NULL;
582  int found_overlap = 0;
583  int overlap_type = 0;
584  size_t noverlaps = 0;
585  size_t ndups = 0;
586  size_t total_filtered = 0;
587  int ndsfiles;
588  DSFile **dsfiles;
589  CDSGroup *fetched;
590  int nobs;
591  CDSGroup *obs;
592  timeval_t *obs_times;
593  size_t obs_ntimes;
594  timeval_t obs_start;
595  timeval_t obs_end;
596  timeval_t ds_time;
597  char ts1[32], ts2[32];
598  int oi, si, ei;
599  int mi, ti, tj, tii, tjj;
600 
602  "%s: Checking For overlaps with previously stored data\n",
603  dataset->name);
604 
605  /* Check for previously stored data within the time range
606  * of the specified dataset. */
607 
608  ndsfiles = _dsproc_find_dsfiles(
609  ds->dir, &(times[0]), &(times[*ntimes - 1]), &dsfiles);
610 
611  if (ndsfiles < 0) return(0);
612  if (ndsfiles == 0) return(1);
613 
614  fetched = cds_define_group(NULL, ds->name);
615  if (!fetched) {
616 
618  "Could not filter previously stored records from dataset: %s\n"
619  " -> memory allocation error\n",
620  dataset->name);
621 
623  free(dsfiles);
624  return(0);
625  }
626 
627  if (gFilterOverlaps & FILTER_OVERLAPS || force_mode) {
628 
629  obs_ntimes = 1;
630  if (!_dsproc_fetch_timevals(ds,
631  ndsfiles, dsfiles, NULL, &(times[0]),
632  &obs_ntimes, &obs_start)) {
633 
634  if (obs_ntimes != 0) return(0);
635  obs_start = times[0];
636  }
637 
638  obs_ntimes = 1;
639  if (!_dsproc_fetch_timevals(ds,
640  ndsfiles, dsfiles, &(times[*ntimes - 1]), NULL,
641  &obs_ntimes, &obs_end)) {
642 
643  if (obs_ntimes != 0) return(0);
644  obs_end = times[*ntimes - 1];
645  }
646  }
647  else {
648  obs_start = times[0];
649  obs_end = times[*ntimes - 1];
650  }
651 
652  nobs = _dsproc_fetch_dataset(
653  ndsfiles, dsfiles, &obs_start, &obs_end,
654  0, NULL, 0, fetched);
655 
656  free(dsfiles);
657 
658  if (nobs <= 0) {
659 
660  cds_delete_group(fetched);
661 
662  if (nobs < 0) {
663  return(0);
664  }
665 
666  return(1);
667  }
668 
669  /* Loop over retrieved observations */
670 
671  found_overlap = 0;
672 
673  for (oi = 0; oi < nobs; oi++) {
674 
675  obs = fetched->groups[oi];
676 
677  /* Get the times for this observation */
678 
679  obs_ntimes = 0;
680  obs_times = dsproc_get_sample_timevals(obs, 0, &obs_ntimes, NULL);
681 
682  if (!obs_times) {
683  if (obs_ntimes != 0) {
684  cds_delete_group(fetched);
685  return(-1);
686  }
687  continue;
688  }
689 
690  /* Find the time indexes in the specified dataset
691  * that overlap this observation. */
692 
693  obs_start = obs_times[0];
694  obs_end = obs_times[obs_ntimes - 1];
695 
696  si = cds_find_timeval_index(*ntimes, times, obs_start, CDS_GTEQ);
697  if (si < 0) {
698  free(obs_times);
699  continue;
700  }
701 
702  ei = cds_find_timeval_index(*ntimes, times, obs_end, CDS_LTEQ);
703  if (ei < 0) {
704  free(obs_times);
705  continue;
706  }
707 
708  if (ei < si) {
709 
710  /* This observation fits between two records in the dataset.
711  *
712  * This may be ok if all the previous records were filtered
713  * out, or all the remaining records will be filtered out.
714  *
715  * We will need to check for this again after filtering out
716  * all the duplicate records. */
717 
718  free(obs_times);
719  continue;
720  }
721 
722  /* Loop over the dataset times */
723 
724  tj = 0;
725 
726  for (ti = si; ti <= ei; ++ti) {
727 
728  ds_time = times[ti];
729  noverlaps = 0;
730  ndups = 0;
731 
732  /* Skip obs times that are less than this dataset time */
733 
734  while ( TV_LT(obs_times[tj], ds_time) ) ++tj;
735 
736  /* We have overlapping records if the times are not equal */
737 
738  if ( TV_NEQ(obs_times[tj], ds_time) ) {
739 
740  /* The start ds time is >= the first obs time and
741  * the end ds time is <= the last obs time,
742  *
743  * so if we get here we know that:
744  *
745  * obs_times[tj-1] < times[ti] < obs_times[tj] */
746 
747  if (gFilterOverlaps & FILTER_TIME_SHIFTS || force_mode) {
748 
749  /* Filter out dataset times until we find one equal to
750  * an obs_time, or greater than the last obs time. */
751 
752  for (tii = ti+1, tjj = tj;
753  tii <= ei && tjj < (int)obs_ntimes;
754  ++tii) {
755 
756  if ( TV_EQ(times[tii], obs_times[tjj]) ) break;
757  while ( TV_GT(times[tii], obs_times[tjj]) ) ++tjj;
758  }
759 
760  noverlaps = tii - ti;
761  overlap_type = 1; // ds times do not line up with obs times
762  }
763  else {
764  found_overlap = 1;
765  break;
766  }
767  }
768  else {
769 
770  /* Check for consecutive duplicate times */
771 
772  for (tii = ti+1, tjj = tj+1;
773  tii <= ei && tjj < (int)obs_ntimes;
774  ++tii, ++tjj) {
775 
776  if ( TV_NEQ(times[tii], obs_times[tjj]) ) break;
777  }
778 
779  ndups = tii - ti;
780 
781  /* Check if these are duplicate or overlapping records */
782 
783  if (!_dsproc_compare_samples(dataset, ti, obs, tj, ndups)) {
784  if (gFilterOverlaps & FILTER_OVERLAPS || force_mode) {
785  noverlaps = ndups;
786  ndups = 0;
787  overlap_type = 2; // times match but data values do not
788  }
789  else {
790  found_overlap = 1;
791  break;
792  }
793  }
794  }
795 
796  /* Check if this is the first record being filtered */
797 
798  if (total_filtered == 0) {
799 
800  if (warning_mail) {
801  mail_unset_flags(warning_mail, MAIL_ADD_NEWLINE);
802  }
803 
805  "%s: Filtering data previously stored in file: %s\n",
806  dataset->name, obs->name);
807 
808  filter_mask = (int *)calloc(*ntimes, sizeof(int));
809  if (!filter_mask) {
810 
812  "Could not filter previously stored records from dataset: %s\n"
813  " -> memory allocation error\n",
814  dataset->name);
815 
817  cds_delete_group(fetched);
818  free(obs_times);
819  return(0);
820  }
821  }
822 
823  /* Set the mask flags */
824 
825  for (mi = ti; mi < tii; ++mi) {
826  filter_mask[mi] = 1;
827  }
828 
829  total_filtered += ndups + noverlaps;
830 
831  /* Print warning message */
832 
833  if (ndups) {
834 
835  if (ndups == 1) {
836 
837  format_timeval(&times[ti], ts1);
838 
840  " - '%s': duplicate record %d\n",
841  ts1, (int)ti);
842  }
843  else {
844 
845  format_timeval(&times[ti], ts1);
846  format_timeval(&times[tii-1], ts2);
847 
849  " - '%s' to '%s': duplicate records %d to %d\n",
850  ts1, ts2, (int)ti, (int)(tii-1));
851  }
852  }
853  else if (noverlaps) {
854 
855  if (noverlaps == 1) {
856 
857  format_timeval(&times[ti], ts1);
858 
859  if (overlap_type == 1) {
861  " - '%s': overlapping record %d (times do not match)\n",
862  ts1, (int)ti);
863  }
864  else {
866  " - '%s': overlapping record %d (data values do not match)\n",
867  ts1, (int)ti);
868  }
869  }
870  else {
871 
872  format_timeval(&times[ti], ts1);
873  format_timeval(&times[tii-1], ts2);
874 
875  if (overlap_type == 1) {
877  " - '%s' to '%s': overlapping records %d to %d (times do not match)\n",
878  ts1, ts2, (int)ti, (int)(tii-1));
879  }
880  else {
882  " - '%s' to '%s': overlapping records %d to %d (data values do not match)\n",
883  ts1, ts2, (int)ti, (int)(tii-1));
884  }
885  }
886  }
887 
888  if ((tii > ei) || (tjj == (int)obs_ntimes)) break;
889 
890  ti = tii - 1;
891  tj = tjj;
892 
893  } /* end loop over dataset times */
894 
895  free(obs_times);
896 
897  if (found_overlap) break;
898 
899  } /* end loop over observations */
900 
901  /* Check if an overlap was found */
902 
903  if (found_overlap) {
904 
905  /* Set status and error message */
906 
907  status = DSPROC_ETIMEOVERLAP;
908 
909  if (ei == si) {
910 
911  format_timeval(&times[si], ts1);
912 
913  errmsg = msngr_create_string(
914  "%s: Overlapping records found with previously stored data\n"
915  " -> '%s': record %d overlaps data in: %s\n",
916  dataset->name, ts1, si, obs->name);
917  }
918  else if (ei < si) {
919 
920  format_timeval(&times[ei], ts1);
921  format_timeval(&times[si], ts2);
922 
923  errmsg = msngr_create_string(
924  "%s: Overlapping records found with previously stored data\n"
925  " -> '%s' to '%s': records %d to %d overlap data in: %s\n",
926  dataset->name, ts1, ts2, ei, si, obs->name);
927  }
928  else {
929 
930  format_timeval(&times[si], ts1);
931  format_timeval(&times[ei], ts2);
932 
933  errmsg = msngr_create_string(
934  "%s: Overlapping records found with previously stored data\n"
935  " -> '%s' to '%s': records %d to %d overlap data in: %s\n",
936  dataset->name, ts1, ts2, si, ei, obs->name);
937  }
938  }
939 
940  /* Check if any duplicates need to be filtered */
941 
942  if (total_filtered) {
943 
944  if (warning_mail) {
945  mail_set_flags(warning_mail, MAIL_ADD_NEWLINE);
946  }
947 
948  if (errmsg) {
950  " - filtering aborted\n\n%s", errmsg);
951  }
952  else {
953 
954  _dsproc_delete_samples(ntimes, times, filter_mask, dataset);
955 
957  " - total records filtered: %d\n", total_filtered);
958  }
959 
960  free(filter_mask);
961  }
962 
963  /* Generate error message if an overlap was found */
964 
965  if (errmsg) {
966  ERROR( DSPROC_LIB_NAME, "%s", errmsg);
967  dsproc_set_status(status);
968  free(errmsg);
969  cds_delete_group(fetched);
970  return(0);
971  }
972 
973  /* Now we need to loop over all retrieved observations again
974  * to verify that there are no overlapping records */
975 
976  if (*ntimes == 0) {
977  cds_delete_group(fetched);
978  return(1);
979  }
980 
981  for (oi = 0; oi < nobs; oi++) {
982 
983  obs = fetched->groups[oi];
984 
985  /* Get the start and end times of this observation */
986 
987  obs_ntimes = dsproc_get_time_range(obs, &obs_start, &obs_end);
988  if (!obs_ntimes) continue;
989 
990  /* Find the time indexes in the specified dataset
991  * that overlap this observation. */
992 
993  si = cds_find_timeval_index(*ntimes, times, obs_start, CDS_GTEQ);
994  if (si < 0) continue;
995 
996  ei = cds_find_timeval_index(*ntimes, times, obs_end, CDS_LTEQ);
997  if (ei < 0) continue;
998 
999  /* This observation still overlaps the specified dataset */
1000 
1001  if (ei == si) {
1002 
1003  format_timeval(&times[si], ts1);
1004 
1006  "%s: Overlapping records found with previously stored data\n"
1007  " -> '%s': record %d overlaps data in: %s\n",
1008  dataset->name, ts1, si, obs->name);
1009  }
1010  else if (ei < si) {
1011 
1012  format_timeval(&times[ei], ts1);
1013  format_timeval(&times[si], ts2);
1014 
1016  "%s: Overlapping records found with previously stored data\n"
1017  " -> '%s' to '%s': records %d to %d overlap data in: %s\n",
1018  dataset->name, ts1, ts2, ei, si, obs->name);
1019  }
1020  else {
1021 
1022  format_timeval(&times[si], ts1);
1023  format_timeval(&times[ei], ts2);
1024 
1026  "%s: Overlapping records found with previously stored data\n"
1027  " -> '%s' to '%s': records %d to %d overlap data in: %s\n",
1028  dataset->name, ts1, ts2, si, ei, obs->name);
1029  }
1030 
1032  cds_delete_group(fetched);
1033  return(0);
1034 
1035  } /* end second loop over observations */
1036 
1037  cds_delete_group(fetched);
1038  return(1);
1039 }
1040 
1041 /** @publicsection */
1042 
1043 /*******************************************************************************
1044  * Internal Functions Visible To The Public
1045  */
1046 
1047 /**
1048  * Disable the warning messages from the NaN/Inf Filter.
1049  */
1051 {
1053 }
1054 
1055 /**
1056  * Replace NaN and Inf values in a variable with missing values.
1057  *
1058  * This function will only replace NaN and Inf values in variables that have
1059  * a missing value defined.
1060  *
1061  * If an error occurs in this function it will be appended to the log and
1062  * error mail messages, and the process status will be set appropriately.
1063  *
1064  * @param var - pointer to the variable
1065  *
1066  * @return
1067  * - number of NaN/Inf values replaced
1068  * - -1 if a memory allocation error occurs
1069  */
1071 {
1072  int nmissings;
1073  CDSData missings;
1074  size_t sample_size;
1075  size_t nvalues;
1076  int nan_count;
1077 
1078  /* Only floats and doubles can have NaN/Inf values */
1079 
1080  if (var->type != CDS_FLOAT &&
1081  var->type != CDS_DOUBLE) {
1082 
1083  return(0);
1084  }
1085 
1086  /* Check if this variable has any missing values defined */
1087 
1088  missings.vp = (void *)NULL;
1089  nmissings = dsproc_get_var_missing_values(var, &(missings.vp));
1090 
1091  if (nmissings <= 0) return(nmissings);
1092 
1093  /* Get the total number of values in the variables data array */
1094 
1095  sample_size = dsproc_var_sample_size(var);
1096  if (!sample_size) {
1097  free(missings.vp);
1098  return(0);
1099  }
1100 
1101  nvalues = var->sample_count * sample_size;
1102 
1103  /* Loop over all values, replacing NaNs and Infs with missing */
1104 
1105  nan_count = 0;
1106  nvalues += 1;
1107 
1108  if (var->type == CDS_FLOAT) {
1109 
1110  float *datap = var->data.fp;
1111  float missing = *(missings.fp);
1112 
1113  while (--nvalues) {
1114 
1115  if (!isfinite(*datap)) {
1116  *datap = missing;
1117  ++nan_count;
1118  }
1119 
1120  ++datap;
1121  }
1122  }
1123  else {
1124 
1125  double *datap = var->data.dp;
1126  double missing = *(missings.dp);
1127 
1128  while (--nvalues) {
1129 
1130  if (!isfinite(*datap)) {
1131  *datap = missing;
1132  ++nan_count;
1133  }
1134 
1135  ++datap;
1136  }
1137  }
1138 
1139  free(missings.vp);
1140 
1141  return(nan_count);
1142 }
1143 
1144 /**
1145  * Replace NaN and Inf values in a dataset with missing values.
1146  *
1147  * This function will only replace NaN and Inf values in variables that have
1148  * a missing value defined.
1149  *
1150  * If the warn flag is set, a warning mail message will be generated if any
1151  * NaN or Inf values are replaced with missing values.
1152  *
1153  * If an error occurs in this function it will be appended to the log and
1154  * error mail messages, and the process status will be set appropriately.
1155  *
1156  * @param dataset - pointer to the dataset
1157  * @param warn - flag specifying if warning messages should be generated
1158  * (0 == false, 1 == true)
1159  *
1160  * @return
1161  * - 1 if successful
1162  * - 0 if a memory allocation error occurs
1163  */
1164 int dsproc_filter_dataset_nans(CDSGroup *dataset, int warn)
1165 {
1166  Mail *warning_mail = msngr_get_mail(MSNGR_WARNING);
1167  int total_nans = 0;
1168  int is_base_time;
1169  CDSVar *var;
1170  int found_nans;
1171  int vi;
1172 
1174  "%s: Checking for Nan/Inf values in dataset\n",
1175  dataset->name);
1176 
1177  /* Loop over all variables in the dataset */
1178 
1179  for (vi = 0; vi < dataset->nvars; ++vi) {
1180 
1181  var = dataset->vars[vi];
1182 
1183  /* Skip variables that are not float or doubles */
1184 
1185  if (var->type != CDS_FLOAT &&
1186  var->type != CDS_DOUBLE) {
1187 
1188  continue;
1189  }
1190 
1191  /* Skip the time variables */
1192 
1193  if (cds_is_time_var(var, &is_base_time)) {
1194  continue;
1195  }
1196 
1197  /* Filter NaN/Inf values */
1198 
1199  found_nans = dsproc_filter_var_nans(var);
1200 
1201  if (found_nans < 0) {
1202  return(0);
1203  }
1204 
1205  /* Generate Warning */
1206 
1207  if (warn && found_nans && !gDisableNanFilterWarnings) {
1208 
1209  if (!total_nans) {
1210 
1211  if (warning_mail) {
1212  mail_unset_flags(warning_mail, MAIL_ADD_NEWLINE);
1213  }
1214 
1216  "%s: Replacing NaN/Inf values with missing values\n",
1217  dataset->name);
1218  }
1219 
1221  " - %s: replaced %d NaN/Inf values\n",
1222  var->name, found_nans);
1223  }
1224 
1225  total_nans += found_nans;
1226  }
1227 
1228  if (warn && total_nans && !gDisableNanFilterWarnings) {
1229 
1230  if (warning_mail) {
1231  mail_set_flags(warning_mail, MAIL_ADD_NEWLINE);
1232  }
1233 
1235  " - total NaN/Inf values replaced: %d\n", total_nans);
1236  }
1237 
1238  return(1);
1239 }
1240 
1241 /**
1242  * Filter overlapping data records.
1243  *
1244  * This switch can be used to configure the filtering logic to remove
1245  * data records from a dataset that overlap with records in either the
1246  * current dataset or previously stored data.
1247  *
1248  * The available modes are:
1249  *
1250  * - FILTER_DUPS_ONLY: This is the default setting and can be used to
1251  * reset the filtering mode back to duplicate
1252  * records only.
1253  *
1254  * - FILTER_TIME_SHIFTS: Filter records that are not in chronological order.
1255  * This filters data records with times that fall
1256  * in-between two records in the either the current
1257  * dataset or previously stored data.
1258  *
1259  * - FILTER_OVERLAPS: Filter records with the same times but different
1260  * data values as records in the either the current
1261  * dataset or previously stored data.
1262  *
1263  * - FILTER_ALL: Same as FILTER_TIME_SHIFTS | FILTER_OVERLAPS.
1264  *
1265  * @param mode - filtering mode
1266  */
1268 {
1269  gFilterOverlaps = mode;
1270 }
1271 
1272 /*******************************************************************************
1273  * Public Functions
1274  */