libdsproc3  2.0
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
dsproc.c
Go to the documentation of this file.
1 /*******************************************************************************
2 *
3 * COPYRIGHT (C) 2010 Battelle Memorial Institute. All Rights Reserved.
4 *
5 ********************************************************************************
6 *
7 * Author:
8 * name: Brian Ermold
9 * phone: (509) 375-2277
10 * email: brian.ermold@pnl.gov
11 *
12 ********************************************************************************
13 *
14 * REPOSITORY INFORMATION:
15 * $Revision: 80903 $
16 * $Author: ermold $
17 * $Date: 2017-10-02 16:05:06 +0000 (Mon, 02 Oct 2017) $
18 *
19 ********************************************************************************
20 *
21 * NOTE: DOXYGEN is used to generate documentation for this file.
22 *
23 *******************************************************************************/
24 
25 /** @file dsproc.c
26  * Data System Process Library Functions.
27  */
28 
29 #include <signal.h>
30 #include <sys/resource.h>
31 
32 #ifndef __GNUC__
33 #include <ucontext.h>
34 #endif
35 
36 #include "dsproc3.h"
37 #include "dsproc_private.h"
38 #include "trans.h"
39 
40 #define COREDUMPSIZE 12000000 /**< Maximum size of core files */
41 
42 extern int _InsideFinishProcessHook;
43 
44 /** @privatesection */
45 
46 /*******************************************************************************
47  * Private Data
48  */
49 
50 DSProc *_DSProc = (DSProc *)NULL; /**< DSProc structure */
51 int _DisableDBUpdates = 0; /**< flag used to disable database updates */
52 int _DisableLockFile = 0; /**< flag used to disable the lock file */
53 int _DisableMail = 0; /**< flag used to disable mail messages */
54 
55 static char *_LogsRoot = (char *)NULL; /**< root path to the logs directory */
56 static char *_LogsDir = (char *)NULL; /**< full path to the logs directory */
57 static char *_LogFile = (char *)NULL; /**< name of the log file */
58 static char *_LogID = (char *)NULL; /**< replace time with ID in log file */
59 static int _Reprocessing = 0; /**< reprocessing mode flag */
60 static int _DynamicDODs = 0; /**< dynamic DODs mode flag */
61 static int _Force = 0; /**< force process past non-fatal errors */
62 static int _LogInterval = 0; /**< log file interval */
63 static int _LogDataTime = 0; /**< use data time for log time */
64 
65 static char _InputDir[PATH_MAX]; /**< input dir from ingest file loop */
66 static char _InputFile[PATH_MAX]; /**< input file from ingest file loop */
67 static char _InputSource[PATH_MAX]; /**< full path to input file */
68 
69 static int _RealTimeMode = 0; /**< real time mode flag */
70 
71 static int _MaxRunTime = -1; /**< max runtime of process */
72 
73 static int _AsynchronousMode = 0; /**< allow asynchronous processing */
74 
75 /** maximum wait time for input data when running in real-time mode */
76 static time_t _MaxRealTimeWait = 3 * 86400;
77 
78 /*******************************************************************************
79  * Static Functions Visible Only To This Module
80  */
81 
82 static char *_get_logs_root()
83 {
84  int status;
85 
86  if (!_LogsRoot) {
87 
88  status = dsenv_get_logs_root(&_LogsRoot);
89 
90  if (status < 0) {
92  return((char *)NULL);
93  }
94  else if (status == 0) {
95 
97  "Could not get path to logs directory\n"
98  " -> the LOGS_DATA environment variable was not found\n");
99 
101  return((char *)NULL);
102  }
103  }
104 
105  return(_LogsRoot);
106 }
107 
108 static int _lock_process(
109  const char *site,
110  const char *facility,
111  const char *proc_name,
112  const char *proc_type)
113 {
114  char *lockfile_root;
115  int status;
116  char errstr[MAX_LOCKFILE_ERROR];
117 
118  if (_DisableLockFile) {
119  return(1);
120  }
121 
123  "Creating process lockfile:\n");
124 
125  /* Determine path to the lockfiles directory */
126 
127  if (_LogsDir) {
128  _DSProc->lockfile_path = strdup(_LogsDir);
129  }
130  else {
131 
132  /* Put lockfiles under the the logs root directory */
133 
134  lockfile_root = _get_logs_root();
135  if (!lockfile_root) {
136  return(0);
137  }
138 
139  /* Create lockfile path string */
140 
141  _DSProc->lockfile_path = msngr_create_string(
142  "%s/%s/lockfiles",
143  lockfile_root, site);
144  }
145 
146  if (!_DSProc->lockfile_path) {
147 
149  "%s%s-%s-%s: Could not create lockfile\n"
150  " -> memory allocation error\n",
151  site, facility, proc_name, proc_type);
152 
153  return(0);
154  }
155 
156  /* Create the lockfile name */
157 
158  _DSProc->lockfile_name = msngr_create_string("%s%s-%s-%s.lock",
159  site, facility, proc_name, proc_type);
160 
161  if (!_DSProc->lockfile_name) {
162 
164  "%s%s-%s-%s: Could not create lockfile\n"
165  " -> memory allocation error\n",
166  site, facility, proc_name, proc_type);
167 
168  free(_DSProc->lockfile_path);
169 
170  _DSProc->lockfile_path = (char *)NULL;
171 
172  return(0);
173  }
174 
176  " - path: %s\n"
177  " - name: %s\n",
178  _DSProc->lockfile_path, _DSProc->lockfile_name);
179 
180  status = lockfile_create(
181  _DSProc->lockfile_path, _DSProc->lockfile_name, 0,
182  MAX_LOCKFILE_ERROR, errstr);
183 
184  if (status <= 0) {
185 
187  "%s%s-%s-%s: %s\n",
188  site, facility, proc_name, proc_type, errstr);
189 
190  free(_DSProc->lockfile_path);
191  free(_DSProc->lockfile_name);
192 
193  _DSProc->lockfile_path = (char *)NULL;
194  _DSProc->lockfile_name = (char *)NULL;
195 
196  return(0);
197  }
198 
199  if (status == 2) {
200 
202  "%s%s-%s-%s: Removed stale lockfile\n"
203  " -> %s/%s\n",
204  site, facility, proc_name, proc_type,
205  _DSProc->lockfile_path, _DSProc->lockfile_name);
206  }
207 
208  return(1);
209 }
210 
211 static void _unlock_process(void)
212 {
213  int status;
214  char errstr[MAX_LOCKFILE_ERROR];
215 
216  if (_DisableLockFile) {
217  return;
218  }
219 
221  "Removing process lockfile:\n"
222  " - path: %s\n"
223  " - name: %s\n",
224  _DSProc->lockfile_path, _DSProc->lockfile_name);
225 
226  status = lockfile_remove(
227  _DSProc->lockfile_path, _DSProc->lockfile_name,
228  MAX_LOCKFILE_ERROR, errstr);
229 
230  if (status < 0) {
231 
233  "%s%s-%s-%s: %s\n",
234  _DSProc->site, _DSProc->facility,
235  _DSProc->name, _DSProc->type, errstr);
236  }
237 
238  free(_DSProc->lockfile_path);
239  free(_DSProc->lockfile_name);
240 
241  _DSProc->lockfile_path = (char *)NULL;
242  _DSProc->lockfile_name = (char *)NULL;
243 }
244 
245 static int _init_process_log(
246  const char *site,
247  const char *facility,
248  const char *proc_name,
249  const char *proc_type)
250 {
251  char *logs_root;
252  char *log_path;
253  char *log_name;
254  time_t log_time;
255  struct tm gmt;
256  int status;
257  char errstr[MAX_LOG_ERROR];
258 
259  memset(&gmt, 0, sizeof(struct tm));
260 
262  "Opening process log file:\n");
263 
264  /* Determine path to the log files directory */
265 
266  if (_LogsDir) {
267  log_path = strdup(_LogsDir);
268  }
269  else {
270 
271  /* Get the root logs directory */
272 
273  logs_root = _get_logs_root();
274  if (!logs_root) {
275  return(0);
276  }
277 
278  /* Create log file path string */
279 
280  log_path = msngr_create_string("%s/%s/proc_logs/%s%s%s",
281  logs_root, site, site, proc_name, facility);
282  }
283 
284  if (!log_path) {
285 
287  "%s%s-%s-%s: Could not open process log\n"
288  " -> memory allocation error\n",
289  site, facility, proc_name, proc_type);
290 
292  return(0);
293  }
294 
295  if (_LogFile) {
296  log_name = strdup(_LogFile);
297  }
298  else if (_LogID) {
299  log_name = msngr_create_string("%s%s%s.%s.%s",
300  site, proc_name, facility, _LogID, proc_type);
301  }
302  else {
303  /* Determine the time to use in the log file name */
304 
305  if (_LogDataTime && _DSProc->cmd_line_begin) {
306  log_time = _DSProc->cmd_line_begin;
307  }
308  else {
309  log_time = time(NULL);
310  }
311 
312  gmtime_r(&log_time, &gmt);
313 
314  /* Create the log file name */
315 
316  switch (_LogInterval) {
317  case LOG_DAILY:
318  log_name = msngr_create_string("%s%s%s.%04d%02d%02d.000000.%s",
319  site, proc_name, facility,
320  (gmt.tm_year + 1900), (gmt.tm_mon + 1), gmt.tm_mday, proc_type);
321  break;
322  case LOG_RUN:
323  log_name = msngr_create_string("%s%s%s.%04d%02d%02d.%02d%02d%02d.%s",
324  site, proc_name, facility,
325  (gmt.tm_year + 1900), (gmt.tm_mon + 1), gmt.tm_mday,
326  gmt.tm_hour, gmt.tm_min, gmt.tm_sec, proc_type);
327  break;
328  default: /* LOG_MONTHLY */
329  log_name = msngr_create_string("%s%s%s.%04d%02d00.000000.%s",
330  site, proc_name, facility,
331  (gmt.tm_year + 1900), (gmt.tm_mon + 1), proc_type);
332  }
333  }
334 
335  if (!log_name) {
336 
338  "%s%s-%s-%s: Could not open process log\n"
339  " -> memory allocation error\n",
340  site, facility, proc_name, proc_type);
341 
343  free(log_path);
344  return(0);
345  }
346 
348  " - path: %s\n"
349  " - name: %s\n",
350  log_path, log_name);
351 
352  status = msngr_init_log(
353  log_path, log_name, (LOG_TAGS | LOG_STATS), MAX_LOG_ERROR, errstr);
354 
355  free(log_path);
356  free(log_name);
357 
358  if (status == 0) {
359 
361  "%s%s-%s-%s: Could not open process log\n"
362  " -> %s\n",
363  site, facility, proc_name, proc_type, errstr);
364 
366  return(0);
367  }
368 
369  return(1);
370 }
371 
372 static int _init_provenance_log(
373  const char *site,
374  const char *facility,
375  const char *proc_name,
376  const char *proc_type)
377 {
378  char *logs_root;
379  char *log_path;
380  char *log_name;
381  time_t log_time;
382  struct tm gmt;
383  int status;
384  char errstr[MAX_LOG_ERROR];
385 
386  memset(&gmt, 0, sizeof(struct tm));
387 
389  "Opening provenance log for: %s%s-%s-%s\n",
390  site, facility, proc_name, proc_type);
391 
392  /* Determine path to the log files directory */
393 
394  if (_LogsDir) {
395  log_path = strdup(_LogsDir);
396  }
397  else {
398 
399  /* Get the root logs directory */
400 
401  logs_root = _get_logs_root();
402  if (!logs_root) {
403  return(0);
404  }
405 
406  /* Create log file path string */
407 
408  log_path = msngr_create_string("%s/%s/provenance/%s%s%s",
409  logs_root, site, site, proc_name, facility);
410  }
411 
412  if (!log_path) {
413 
415  "%s%s-%s-%s: Could not open provenance log\n"
416  " -> memory allocation error\n",
417  site, facility, proc_name, proc_type);
418 
420  return(0);
421  }
422 
423  if (_LogFile) {
424  log_name = msngr_create_string("%s.Provenance", _LogFile);
425  }
426  else if (_LogID) {
427  log_name = msngr_create_string("%s%s%s.%s.%s.Provenance",
428  site, proc_name, facility, _LogID, proc_type);
429  }
430  else {
431  /* Determine the time to use in the log file name */
432 
433  if (_LogDataTime && _DSProc->cmd_line_begin) {
434  log_time = _DSProc->cmd_line_begin;
435  }
436  else {
437  log_time = time(NULL);
438  }
439 
440  gmtime_r(&log_time, &gmt);
441 
442  /* Create the log file name */
443 
444  log_name = msngr_create_string("%s%s%s.%04d%02d%02d.%02d%02d%02d.%s.Provenance",
445  site, proc_name, facility,
446  (gmt.tm_year + 1900), (gmt.tm_mon + 1), gmt.tm_mday,
447  gmt.tm_hour, gmt.tm_min, gmt.tm_sec, proc_type);
448 
449  }
450 
451  if (!log_name) {
452 
454  "%s%s-%s-%s: Could not open provenance log\n"
455  " -> memory allocation error\n",
456  site, facility, proc_name, proc_type);
457 
459  free(log_path);
460  return(0);
461  }
462 
464  " - path: %s\n"
465  " - name: %s\n",
466  log_path, log_name);
467 
468  status = msngr_init_provenance(
469  log_path, log_name, (LOG_TAGS | LOG_STATS), MAX_LOG_ERROR, errstr);
470 
471  free(log_path);
472  free(log_name);
473 
474  if (status == 0) {
475 
477  "%s%s-%s-%s: Could not open provenance log\n"
478  " -> %s\n",
479  site, facility, proc_name, proc_type, errstr);
480 
482  return(0);
483  }
484 
485  return(1);
486 }
487 
488 static int _init_mail(
489  MessageType mail_type,
490  char *mail_from,
491  char *mail_subject,
492  const char *config_key)
493 {
494  int status;
495  ProcConf **proc_conf;
496  char *mail_to;
497  size_t mail_to_length;
498  char errstr[MAX_MAIL_ERROR];
499  int i;
500 
501  if (_DisableMail) {
502  return(1);
503  }
504 
506  "Checking database for '%s' custodians\n", config_key);
507 
508  /* Get the process configuration for this key */
509 
511  _DSProc->site, _DSProc->facility, _DSProc->type, _DSProc->name,
512  config_key, &proc_conf);
513 
514  if (status == 1) {
515 
516  /* Get the length of the mail_to string */
517 
518  mail_to_length = 0;
519 
520  for (i = 0; proc_conf[i]; i++) {
521  mail_to_length += strlen(proc_conf[i]->value) + 2;
522  }
523 
524  /* Allocate memory for the mail_to string */
525 
526  mail_to = (char *)calloc(mail_to_length, sizeof(char));
527  if (!mail_to) {
528 
530  "Could not initialize mail message for: %s\n"
531  " -> memory allocation error\n", config_key);
532 
534  return(0);
535  }
536 
537  /* Create the mail_to string */
538 
539  for (i = 0; proc_conf[i]; i++) {
540 
541  DEBUG_LV1( DSPROC_LIB_NAME, " - %s\n", proc_conf[i]->value);
542 
543  if (i) strcat(mail_to, ",");
544 
545  strcat(mail_to, proc_conf[i]->value);
546  }
547 
548  /* Initialize the mail message */
549 
550  status = msngr_init_mail(
551  mail_type,
552  mail_from,
553  mail_to,
554  NULL,
555  mail_subject,
556  MAIL_ADD_NEWLINE,
557  MAX_MAIL_ERROR,
558  errstr);
559 
560  /* Cleanup and exit */
561 
563  free(mail_to);
564 
565  if (status == 0) {
566 
568  "Could not initialize mail message for: %s\n"
569  " -> %s\n", config_key, errstr);
570 
572  return(0);
573  }
574  }
575  else if (status < 0) {
577  return(0);
578  }
579  else {
580  DEBUG_LV1( DSPROC_LIB_NAME, " - none found\n");
581  }
582 
583  return(1);
584 }
585 
586 static void _finish_mail(
587  MessageType mail_type,
588  int mail_error_status,
589  char *status_message,
590  char *last_status_text,
591  time_t last_completed,
592  time_t last_successful,
593  char *finish_time_string)
594 {
595  Mail *mail;
596  char last_completed_string[32];
597  char last_successful_string[32];
598 
599  if (_DisableMail) {
600  return;
601  }
602 
603  mail = msngr_get_mail(mail_type);
604  if (!mail) {
605  return;
606  }
607 
608  if (mail->body[0] != '\0' || mail_error_status) {
609 
610  if (last_status_text) {
611  format_secs1970(last_completed, last_completed_string);
612  format_secs1970(last_successful, last_successful_string);
613  }
614 
615  if (status_message && last_status_text) {
616  mail_printf(mail,
617  "%s\n"
618  "Last Status: %s\n"
619  "Last Completed: %s\n"
620  "Last Successful: %s\n",
621  status_message,
622  last_status_text,
623  last_completed_string,
624  last_successful_string);
625  }
626  else if (status_message) {
627  mail_printf(mail,
628  "%s\n"
629  "No Previous Status Has Been Recorded\n",
630  status_message);
631  }
632  else if (last_status_text) {
633  mail_printf(mail,
634  "Current Status: %s\n"
635  "Status: Memory allocation error creating status message\n"
636  "\n"
637  "Last Status: %s\n"
638  "Last Completed: %s\n"
639  "Last Successful: %s\n",
640  finish_time_string,
641  last_status_text,
642  last_completed_string,
643  last_successful_string);
644  }
645  else {
646  mail_printf(mail,
647  "Current Status: %s\n"
648  "Status: Memory allocation error creating status message\n"
649  "\n"
650  "No Previous Status Has Been Recorded\n",
651  finish_time_string);
652  }
653  }
654 }
655 
656 static void _signal_handler(int sig, siginfo_t *si, void *uc)
657 {
658  int rename = 1;
659  const char *status;
660  int exit_value;
661 
662  si = NULL; /* prevent "unused parameter" compiler warning */
663  uc = NULL; /* prevent "unused parameter" compiler warning */
664 
665  switch (sig) {
666  case SIGQUIT:
667  status = "SIGQUIT: Quit (see termio(7I))";
668  rename = 0;
669  break;
670  case SIGILL:
671  status = "SIGILL: Illegal Instruction";
672  break;
673  case SIGTRAP:
674  status = "SIGTRAP: Trace or Breakpoint Trap";
675  rename = 0;
676  break;
677  case SIGABRT:
678  status = "SIGABRT: Abort";
679  break;
680 #ifndef __GNUC__
681  case SIGEMT:
682  status = "SIGEMT: Emulation Trap";
683  break;
684 #endif
685  case SIGFPE:
686  status = "SIGFPE: Arithmetic Exception";
687  break;
688  case SIGBUS:
689  status = "SIGBUS: Bus Error";
690  break;
691  case SIGSEGV:
692  status = "SIGSEGV: Segmentation Fault";
693  break;
694  case SIGSYS:
695  status = "SIGSYS: Bad System Call";
696  break;
697  case SIGHUP:
698  status = "SIGHUP: Hangup (see termio(7I))";
699  rename = 0;
700  break;
701  case SIGINT:
702  status = "SIGINT: Interrupt (see termio(7I))";
703  rename = 0;
704  break;
705  case SIGPIPE:
706  status = "SIGPIPE: Broken Pipe";
707  rename = 0;
708  break;
709  case SIGALRM:
710  status = "SIGALRM: Alarm Clock";
711  rename = 0;
712  break;
713  case SIGTERM:
714  status = "SIGTERM: Terminated";
715  rename = 0;
716  break;
717  default:
718  status = "Trapped Unknown Signal Type";
719  }
720 
722  "Received Signal: %s\n", status);
723 
724  dsproc_set_status(status);
725 
726 #ifndef __GNUC__
727  if (msngr_debug_level) {
728  printstack(fileno(stdout));
729  }
730 #endif
731 
732  /* If this is an ingest and the force option is enabled
733  * we need to try to move the file that caused the problem. */
734 
735  if (rename &&
736  _DSProc &&
737  _DSProc->model == PM_INGEST &&
739 
740  const char *input_dir = dsproc_get_input_dir();
741  const char *input_file = dsproc_get_input_file();
742 
743  if (input_dir && input_file) {
744  dsproc_force_rename_bad(input_dir, input_file);
745  }
746  }
747 
748  /* Cleanup and exit the process */
749 
750  _dsproc_run_finish_process_hook();
751 
752  exit_value = dsproc_finish();
753  exit(exit_value);
754 }
755 
756 static int _init_signal_handlers(void)
757 {
758  struct sigaction act;
759 #ifndef __GNUC__
760  struct rlimit rl;
761 #endif
762 
763  memset(&act, 0, sizeof(act));
764 
766  "Initializing signal handlers\n");
767 
768  /* act.sa_mask = sigset_t(0); */
769  act.sa_handler = 0;
770  act.sa_flags = (SA_SIGINFO);
771  act.sa_sigaction = _signal_handler;
772 
773  if (sigaction(SIGHUP, &act, 0) != 0 || /* Hangup (see termio(7I)) */
774  sigaction(SIGINT, &act, 0) != 0 || /* Interrupt (see termio(7I)) */
775  sigaction(SIGQUIT, &act, 0) != 0 || /* Quit (see termio(7I)) */
776  sigaction(SIGILL, &act, 0) != 0 || /* Illegal Instruction */
777  sigaction(SIGTRAP, &act, 0) != 0 || /* Trace or Breakpoint Trap */
778  sigaction(SIGABRT, &act, 0) != 0 || /* Abort */
779 #ifndef __GNUC__
780  sigaction(SIGEMT, &act, 0) != 0 || /* Emulation Trap */
781 #endif
782  sigaction(SIGFPE, &act, 0) != 0 || /* Arithmetic Exception */
783  sigaction(SIGBUS, &act, 0) != 0 || /* Bus Error */
784  sigaction(SIGSEGV, &act, 0) != 0 || /* Segmentation Fault */
785  sigaction(SIGSYS, &act, 0) != 0 || /* Bad System Call */
786  sigaction(SIGPIPE, &act, 0) != 0 || /* Broken Pipe */
787  sigaction(SIGALRM, &act, 0) != 0 || /* Alarm Clock */
788  sigaction(SIGTERM, &act, 0) != 0) { /* Terminated */
789 
791  "Could not initialize signal handlers:\n"
792  " -> %s\n", strerror(errno));
793 
795  return(0);
796  }
797 
798 #ifndef __GNUC__
799  /* Limit the core file size */
800 
801  rl.rlim_cur = COREDUMPSIZE;
802  rl.rlim_max = COREDUMPSIZE;
803 
804  if (setrlimit(RLIMIT_CORE, &rl) == -1) {
805 
807  "Could not set core file size limit:\n"
808  " -> %s\n", strerror(errno));
809 
811  return(0);
812  }
813 #endif
814 
815  return(1);
816 }
817 
818 /**
819  * Static: Initialize a data system process.
820  *
821  * This function will:
822  *
823  * - Initialize the mail messages
824  * - Update the process start time in the database
825  * - Initialize the signal handlers
826  * - Define non-standard unit symbols
827  * - Get process configuration information from database
828  *
829  * @return
830  * - 1 if succesful
831  * - 0 if an error occurred
832  */
833 static int _dsproc_init(void)
834 {
835  const char *site = _DSProc->site;
836  const char *facility = _DSProc->facility;
837  const char *proc_name = _DSProc->name;
838  const char *proc_type = _DSProc->type;
839  time_t start_time = _DSProc->start_time;
840  ProcLoc *proc_loc;
841  const char *site_desc;
842  char mail_from[64];
843  char mail_subject[128];
844  char *config_value;
845  int status;
846 
847  /************************************************************
848  * Initialize mail messages
849  *************************************************************/
850 
851  if (!_DisableMail) {
852 
853  snprintf(mail_from, 64, "%s%s%s", site, proc_name, facility);
854 
855  /* Error Mail */
856 
857  snprintf(mail_subject, 128,
858  "%s Error: %s%s.%s ", proc_type, site, facility, proc_name);
859 
860  if (!_init_mail(MSNGR_ERROR, mail_from, mail_subject, "error_mail")) {
861  return(0);
862  }
863 
864  /* Warning Mail */
865 
866  snprintf(mail_subject, 128,
867  "%s Warning: %s%s.%s ", proc_type, site, facility, proc_name);
868 
869  if (!_init_mail(MSNGR_WARNING, mail_from, mail_subject, "warning_mail")) {
870  return(0);
871  }
872 
873  /* Mentor Mail */
874 
875  snprintf(mail_subject, 128,
876  "%s Message: %s%s.%s ", proc_type, site, facility, proc_name);
877 
878  if (!_init_mail(MSNGR_MAINTAINER, mail_from, mail_subject, "mentor_mail")) {
879  return(0);
880  }
881  }
882 
883  /************************************************************
884  * Update process start time in the database
885  *************************************************************/
886 
887  if (!_DisableDBUpdates) {
888 
889  if (msngr_debug_level || msngr_provenance_level) {
890 
891  char time_string[32];
892 
893  format_secs1970(start_time, time_string);
894 
896  "Updating process start time in database: %s\n", time_string);
897  }
898 
900  _DSProc->dsdb, site, facility, proc_type, proc_name, start_time);
901 
902  if (status <= 0) {
903 
904  if (status == 0) {
905 
907  "Could not update process start time in database\n"
908  " -> unexpected NULL result from database");
909  }
910 
912  return(0);
913  }
914  }
915 
916  /************************************************************
917  * Initialize the signal handlers
918  *************************************************************/
919 
920  if (!_init_signal_handlers()) {
921  return(0);
922  }
923 
924  /************************************************************
925  * Map non-standard unit symbols used by ARM to standard
926  * units in the UDUNITS-2 dictionary.
927  *************************************************************/
928 
929  if (!cds_map_symbol_to_unit("C", "degree_Celsius") ||
930  !cds_map_symbol_to_unit("deg", "degree") ||
931  !cds_map_symbol_to_unit("mb", "millibar") ||
932  !cds_map_symbol_to_unit("srad", "steradian") ||
933  !cds_map_symbol_to_unit("ster", "steradian") ||
934  !cds_map_symbol_to_unit("unitless", "1")) {
935 
936  return(0);
937  }
938 
939  /************************************************************
940  * Set the standard attributes we should exclude from the
941  * dod compare.
942  *************************************************************/
943 
944  if (!_dsproc_set_standard_exclude_atts()) {
945  return(0);
946  }
947 
948  /************************************************************
949  * Get the process location
950  *************************************************************/
951 
952  status = dsproc_get_location(&proc_loc);
953 
954  if (status <= 0) {
955 
956  if (status == 0) {
957 
959  "Could not get process location from database\n"
960  " -> unexpected NULL result from database query\n");
961 
963  }
964 
965  return(0);
966  }
967 
968  /************************************************************
969  * Get the site description
970  *************************************************************/
971 
972  status = dsproc_get_site_description(&site_desc);
973 
974  if (status <= 0) {
975 
976  if (status == 0) {
977 
979  "Could not get site description from database\n"
980  " -> unexpected NULL result from database query\n");
981 
983  }
984 
985  return(0);
986  }
987 
988  /************************************************************
989  * Get the max runtime
990  *************************************************************/
991 
992  if (_MaxRunTime >= 0) {
993  _DSProc->max_run_time = _MaxRunTime;
994  }
995  else {
996  status = dsproc_get_config_value("max_run_time", &config_value);
997 
998  if (status == 1) {
999  _DSProc->max_run_time = (time_t)atoi(config_value);
1000  free(config_value);
1001  }
1002  else if (status == 0) {
1003  _DSProc->max_run_time = (time_t)0;
1004  }
1005  else {
1006  return(0);
1007  }
1008  }
1009 
1010  /************************************************************
1011  * Get the data expectation interval
1012  *************************************************************/
1013 
1014  status = dsproc_get_config_value("data_interval", &config_value);
1015 
1016  if (status == 1) {
1017  _DSProc->data_interval = (time_t)atoi(config_value);
1018  free(config_value);
1019  }
1020  else if (status == 0) {
1021  _DSProc->data_interval = (time_t)0;
1022  }
1023  else {
1024  return(0);
1025  }
1026 
1027  /************************************************************
1028  * Get minimum valid data time
1029  *************************************************************/
1030 
1031  status = dsproc_get_config_value("min_valid_time", &config_value);
1032 
1033  if (status == 1) {
1034  _DSProc->min_valid_time = (time_t)atoi(config_value);
1035  free(config_value);
1036  }
1037  else if (status == 0) {
1038  /* 694224000 = 1992-01-01 00:00:00 */
1039  _DSProc->min_valid_time = (time_t)694224000;
1040  }
1041  else {
1042  return(0);
1043  }
1044 
1045  return(1);
1046 }
1047 
1048 /**
1049  * Static: Read the next processing interval begin time file.
1050  *
1051  * @return
1052  * - 1 if succesful
1053  * - 0 if the file doesn't exist
1054  * - -1 if an error occurred
1055  */
1056 static int _read_next_begin_time_file(time_t *begin_time)
1057 {
1058  LogFile *log = msngr_get_log_file();
1059  const char *path = log->path;
1060  const char *file = ".next_begin_time";
1061  char full_path[PATH_MAX];
1062  FILE *fp;
1063  char timestamp[64];
1064  int count;
1065  int year, mon, day, hour, min, sec;
1066 
1067  /* Check to see if the next begin time file exists */
1068 
1069  snprintf(full_path, PATH_MAX, "%s/%s", path, file);
1070 
1071  if (access(full_path, F_OK) != 0) {
1072 
1073  if (errno != ENOENT) {
1074 
1076  "Could not access file: %s\n"
1077  " -> %s\n", full_path, strerror(errno));
1078 
1080  return(-1);
1081  }
1082 
1083  return(0);
1084  }
1085 
1087  "Getting processing period begin time from file: %s\n",
1088  full_path);
1089 
1090  /* Open the timestamp file */
1091 
1092  fp = fopen(full_path, "r");
1093  if (!fp) {
1094 
1096  "Could not open file: %s\n"
1097  " -> %s\n",
1098  full_path, strerror(errno));
1100  return(-1);
1101  }
1102 
1103  /* Read in the timestamp */
1104 
1105  if (!fgets(timestamp, 64, fp)) {
1106 
1108  "Could not read file: %s\n"
1109  " -> %s\n",
1110  full_path, strerror(errno));
1111 
1113  fclose(fp);
1114  return(-1);
1115  }
1116 
1117  fclose(fp);
1118 
1119  /* Convert timestamp to seconds since 1970 */
1120 
1121  count = sscanf(timestamp,
1122  "%4d%2d%2d.%2d%2d%2d",
1123  &year, &mon, &day, &hour, &min, &sec);
1124 
1125  if (count != 6) {
1126 
1128  "Invalid timestamp format '%s' in file: %s\n"
1129  " -> expected a string of the form YYYYMMDD.hhmmss'\n",
1130  timestamp, full_path);
1131 
1132  dsproc_set_status("Invalid Timestamp Format");
1133  return(-1);
1134  }
1135 
1136  *begin_time = get_secs1970(year, mon, day, hour, min, sec);
1137 
1138  return(1);
1139 }
1140 
1141 /**
1142  * Static: Update the next processing interval begin time file.
1143  *
1144  * @return
1145  * - 1 if succesful
1146  * - 0 if an error occurred
1147  */
1148 static int _update_next_begin_time_file(time_t begin_time)
1149 {
1150  LogFile *log = msngr_get_log_file();
1151  const char *path = log->path;
1152  const char *file = ".next_begin_time";
1153  char full_path[PATH_MAX];
1154  FILE *fp;
1155  char timestamp[64];
1156 
1157  snprintf(full_path, PATH_MAX, "%s/%s", path, file);
1158 
1159  if (msngr_debug_level || msngr_provenance_level) {
1160 
1161  format_secs1970(begin_time, timestamp);
1162 
1164  "Updating next processing period begin time file:\n"
1165  " -> file: %s\n"
1166  " -> time: %s\n",
1167  full_path, timestamp);
1168  }
1169 
1170  /* Convert timestamp to seconds since 1970 */
1171 
1172  if (!dsproc_create_timestamp(begin_time, timestamp)) {
1173  return(0);
1174  }
1175 
1176  /* Open the timestamp file */
1177 
1178  fp = fopen(full_path, "w");
1179  if (!fp) {
1180 
1182  "Could not open file: %s\n"
1183  " -> %s\n",
1184  full_path, strerror(errno));
1186  return(0);
1187  }
1188 
1189  /* Update the timestamp */
1190 
1191  if (!fprintf(fp, "%s", timestamp)) {
1192 
1194  "Could not write to file: %s\n"
1195  " -> %s\n",
1196  full_path, strerror(errno));
1197 
1199  fclose(fp);
1200  return(0);
1201  }
1202 
1203  fclose(fp);
1204 
1205  return(1);
1206 }
1207 
1208 /**
1209  * Static: Check input datastreams for observation loop.
1210  *
1211  * @return
1212  * - 1 use observation loop
1213  * - 0 not using observation loop
1214  * - -1 memory error
1215  */
1216 static void _check_for_obs_loop(void)
1217 {
1218  int dsid;
1219  DataStream *ds;
1220 
1221  for (dsid = 0; dsid < _DSProc->ndatastreams; dsid++) {
1222 
1223  ds = _DSProc->datastreams[dsid];
1224 
1225  if (ds->role == DSR_INPUT &&
1226  ds->flags & DS_OBS_LOOP) {
1227 
1228  _DSProc->use_obs_loop = 1;
1229  break;
1230  }
1231  }
1232 }
1233 
1234 /**
1235  * Static: Set the next processing interval for observation loops.
1236  *
1237  * @param search_start - start time of search
1238  *
1239  * @return
1240  * - 1 if succesful
1241  * - 0 if no new data was found
1242  * - -1 if an error occurred
1243  */
1244 int _set_next_obs_loop_interval(time_t search_start)
1245 {
1246  int dsid;
1247  DataStream *ds;
1248 
1249  DSFile *dsfile;
1250 
1251  timeval_t file_begin;
1252  timeval_t file_end;
1253 
1254  timeval_t search_begin = { search_start, 0 };
1255  timeval_t begin = { 0, 0 };
1256  timeval_t end = { 0, 0 };
1257 
1258  int status;
1259 
1260  const char *next_obs = NULL;
1261  char ts1[32];
1262  char ts2[32];
1263 
1265  "Searching for next observation after: %s\n",
1266  format_timeval(&search_begin, ts1));
1267 
1268  for (dsid = 0; dsid < _DSProc->ndatastreams; dsid++) {
1269 
1270  ds = _DSProc->datastreams[dsid];
1271 
1272  if (ds->role != DSR_INPUT) continue;
1273 
1274  if (!(ds->flags & DS_OBS_LOOP)) continue;
1275 
1276  status = _dsproc_find_next_dsfile(ds->dir, &search_begin, &dsfile);
1277 
1278  if (status < 0) {
1279  // error
1280  return(-1);
1281  }
1282 
1283  if (status == 0) {
1284  // no files found after specified search start time
1285  continue;
1286  }
1287 
1288  file_begin = dsfile->timevals[0];
1289  file_end = dsfile->timevals[dsfile->ntimes - 1];
1290 
1291  if (!begin.tv_sec || TV_LT(file_begin, begin)) {
1292  begin = file_begin;
1293  end = file_end;
1294  next_obs = dsfile->name;
1295  }
1296  else if (TV_EQ(file_begin, begin)) {
1297  if (TV_GT(file_end, end)) {
1298  end = file_end;
1299  next_obs = dsfile->name;
1300  }
1301  }
1302  }
1303 
1304  if (!begin.tv_sec) {
1305 
1307  " - none found\n");
1308 
1309  return(0);
1310  }
1311 
1312  _DSProc->interval_begin = begin.tv_sec;
1313  _DSProc->interval_end = end.tv_sec + 1;
1314 
1316  " - found %s from %s to %s\n",
1317  next_obs,
1318  format_secs1970(_DSProc->interval_begin, ts1),
1319  format_secs1970(_DSProc->interval_end, ts2));
1320 
1321  return(1);
1322 }
1323 
1324 /**
1325  * Static: Set the next processing period begin time for real time processes.
1326  *
1327  * @return
1328  * - 1 if succesful
1329  * - 0 if the process needs to wait for new data
1330  * - -1 if an error occurred
1331  */
1332 static int _set_next_real_time_begin(void)
1333 {
1334  int ndsids;
1335  int *dsids;
1336  int dsid;
1337  int i;
1338 
1339  time_t now = time(NULL);
1340  timeval_t search = { 0, 0 };
1341  timeval_t found = { 0, 0 };
1342  size_t ntimes;
1343 
1344  time_t begin;
1345  time_t end;
1346  struct tm gmt;
1347  int status;
1348 
1349  char ts1[32];
1350 
1351  /* Get the next begin time from the "next begin time" file if it exists. */
1352 
1353  begin = 0;
1354  status = _read_next_begin_time_file(&begin);
1355  if (status < 0) {
1356  return(-1);
1357  }
1358 
1359  /* If the "next begin time" file does not exist, use the earliest end
1360  * time of all output datastreams to determine the next begin time. */
1361 
1362  if (status == 0) {
1363 
1364  begin = 0;
1365  end = 0;
1366 
1367  /* Get the IDs of all output datastreams */
1368 
1369  ndsids = dsproc_get_output_datastream_ids(&dsids);
1370  if (ndsids < 0) return(-1);
1371 
1372  for (i = 0; i < ndsids; ++i) {
1373 
1374  dsid = dsids[i];
1375  ntimes = 1;
1376  search.tv_sec = now;
1377 
1378  if (!dsproc_fetch_timevals(
1379  dsid, NULL, &search, &ntimes, &found)) {
1380 
1381  if (ntimes != 0) {
1382  free(dsids);
1383  return(-1);
1384  }
1385 
1386  continue;
1387  }
1388 
1389  if (end == 0 || found.tv_sec < end) {
1390  end = found.tv_sec;
1391  }
1392  }
1393 
1394  free(dsids);
1395 
1396  if (end != 0) {
1397 
1399  "Getting processing period begin time from earliest output datastream end time\n");
1400 
1401  /* Set begin time to the start of the next processing interval
1402  * after the earliest output datastream end time. */
1403 
1404  begin = cds_get_midnight(end);
1405  while (begin < end) begin += _DSProc->proc_interval;
1406  }
1407  }
1408 
1409  /* If we still haven't been able to determine the begin time
1410  * we need to use the earliest available input data. */
1411 
1412  if (begin == 0) {
1413 
1415  "Getting processing period begin time from earliest available input data\n");
1416 
1417  /* Get the IDs of all input datastreams */
1418 
1419  ndsids = dsproc_get_input_datastream_ids(&dsids);
1420  if (ndsids < 0) return(-1);
1421 
1422  for (i = 0; i < ndsids; ++i) {
1423 
1424  dsid = dsids[i];
1425  ntimes = 1;
1426  search.tv_sec = 1;
1427 
1428  if (!dsproc_fetch_timevals(
1429  dsid, &search, NULL, &ntimes, &found)) {
1430 
1431  if (ntimes != 0) {
1432  free(dsids);
1433  return(-1);
1434  }
1435 
1436  continue;
1437  }
1438 
1439  if (begin == 0 || found.tv_sec < begin) {
1440  begin = found.tv_sec;
1441  }
1442  }
1443 
1444  free(dsids);
1445 
1446  if (begin == 0) {
1447 
1449  "No data was found for any input datastreams.\n");
1450 
1452  return(0);
1453  }
1454 
1455  /* Adjust begin time to either midnight or the start of the hour,
1456  * depending on the processing interval.
1457  */
1458 
1459  memset(&gmt, 0, sizeof(struct tm));
1460  gmtime_r(&begin, &gmt);
1461 
1462  if (_DSProc->proc_interval != 3600) {
1463  gmt.tm_hour = 0;
1464  }
1465 
1466  gmt.tm_min = 0;
1467  gmt.tm_sec = 0;
1468 
1469  begin = timegm(&gmt);
1470  }
1471 
1472  if (msngr_debug_level || msngr_provenance_level) {
1473 
1474  format_secs1970(begin, ts1);
1475 
1477  "Processing period begin time: %s\n",
1478  ts1);
1479  }
1480 
1481  _DSProc->period_begin = begin;
1482 
1483  return(1);
1484 }
1485 
1486 /**
1487  * Static: Set the next processing period end time for real time processes.
1488  *
1489  * @return
1490  * - 1 if succesful
1491  * - 0 if the process needs to wait for new data
1492  * - -1 if an error occurred
1493  */
1494 static int _set_next_real_time_end(void)
1495 {
1496  int ndsids;
1497  int *dsids;
1498  DataStream *ds;
1499  int dsid;
1500  int i;
1501 
1502  time_t now = time(NULL);
1503  timeval_t search = { 0, 0 };
1504  timeval_t found = { 0, 0 };
1505  size_t ntimes;
1506 
1507  time_t end;
1508  time_t max_end;
1509  time_t delta_t;
1510  time_t begin;
1511  int count;
1512 
1513  char ts1[32];
1514 
1515  /* Determine the end time of the processing period */
1516 
1518  "Determining the processing period end time\n"
1519  " - using %d hours for maximum input data wait time",
1520  (int)(_MaxRealTimeWait/3600 + 0.5));
1521 
1522  end = 0;
1523  max_end = 0;
1524 
1525  /* Get the IDs of all input datastreams */
1526 
1527  ndsids = dsproc_get_input_datastream_ids(&dsids);
1528  if (ndsids < 0) return(-1);
1529 
1530  for (i = 0; i < ndsids; ++i) {
1531 
1532  dsid = dsids[i];
1533  ds = _DSProc->datastreams[dsid];
1534  ntimes = 1;
1535  search.tv_sec = now;
1536 
1537  if (!dsproc_fetch_timevals(
1538  dsid, NULL, &search, &ntimes, &found)) {
1539 
1540  if (ntimes != 0) {
1541  free(dsids);
1542  return(-1);
1543  }
1544 
1545  continue;
1546  }
1547 
1548  /* Adjust for the end time offset. */
1549 
1550  if (ds->ret_cache) {
1551  found.tv_sec -= ds->ret_cache->end_offset;
1552  }
1553 
1554  /* Keep track of the maximum end time found, we will try to use
1555  * this if no new data is found within the maximum wait time. */
1556 
1557  if (max_end == 0 ||
1558  max_end < found.tv_sec) {
1559  max_end = found.tv_sec;
1560  }
1561 
1562  /* We want the earliest end time found within the maximum wait time
1563  * window to ensure we have the most complete dataset possible. */
1564 
1565  delta_t = now - found.tv_sec;
1566 
1567  if (delta_t < _MaxRealTimeWait) {
1568 
1569  if (end == 0 ||
1570  end > found.tv_sec) {
1571  end = found.tv_sec;
1572  }
1573  }
1574  }
1575 
1576  free(dsids);
1577 
1578  if (end == 0) {
1579 
1580  if (max_end == 0) {
1581 
1583  "No new data was found for any input datastreams.\n");
1584 
1586  return(0);
1587  }
1588 
1589  end = max_end;
1590  }
1591 
1592  _DSProc->period_end_max = end;
1593 
1594  /* Adjust end time so end - begin is an even multiple
1595  * of the processing interval.
1596  */
1597 
1598  begin = _DSProc->period_begin;
1599 
1600  if (end > begin) {
1601  count = (int)((end - begin) / _DSProc->proc_interval);
1602  end = begin + (count * _DSProc->proc_interval);
1603  }
1604 
1605  /* Check if we have new data to process.
1606  */
1607 
1608  if (end <= begin) {
1609 
1611  "Missing input data for one or more datastreams.\n"
1612  " -> waiting for input data or the maximum wait time of %d hours is reached",
1613  (int)(_MaxRealTimeWait/3600 + 0.5));
1614 
1616  return(0);
1617  }
1618 
1619  if (msngr_debug_level || msngr_provenance_level) {
1620 
1621  format_secs1970(end, ts1);
1622 
1624  "Processing period end time: %s\n",
1625  ts1);
1626  }
1627 
1628  _DSProc->period_end = end;
1629 
1630  return(1);
1631 }
1632 
1633 /*******************************************************************************
1634  * Private Functions Visible Only To This Library
1635  */
1636 
1637 /**
1638  * Free all memory used by the internal _DSProc structure.
1639  */
1640 void _dsproc_destroy(void)
1641 {
1642  int i;
1643 
1644  if (_LogsRoot) {
1645  free(_LogsRoot);
1646  _LogsRoot = (char *)NULL;
1647  }
1648 
1649  if (_LogsDir) {
1650  free(_LogsDir);
1651  _LogsDir = (char *)NULL;
1652  }
1653 
1654  if (_LogFile) {
1655  free(_LogFile);
1656  _LogFile = (char *)NULL;
1657  }
1658 
1659  if (_LogID) {
1660  free(_LogID);
1661  _LogID = (char *)NULL;
1662  }
1663 
1664  if (_DSProc) {
1665 
1666  if (_DSProc->lockfile_path && _DSProc->lockfile_name) {
1667  _unlock_process();
1668  }
1669 
1671  "Freeing internal memory\n");
1672 
1673  if (_DSProc->site) free((void *)_DSProc->site);
1674  if (_DSProc->facility) free((void *)_DSProc->facility);
1675  if (_DSProc->name) free((void *)_DSProc->name);
1676  if (_DSProc->type) free((void *)_DSProc->type);
1677  if (_DSProc->version) free((void *)_DSProc->version);
1678  if (_DSProc->db_alias) free((void *)_DSProc->db_alias);
1679  if (_DSProc->site_desc) free((void *)_DSProc->site_desc);
1680  if (_DSProc->full_name) free((void *)_DSProc->full_name);
1681 
1682  if (_DSProc->retriever) {
1683  _dsproc_free_retriever();
1684  }
1685 
1686  if (_DSProc->ret_data) {
1687  cds_set_definition_lock(_DSProc->ret_data, 0);
1688  cds_delete_group(_DSProc->ret_data);
1689  }
1690 
1691  if (_DSProc->trans_data) {
1692  cds_set_definition_lock(_DSProc->trans_data, 0);
1693  cds_delete_group(_DSProc->trans_data);
1694  }
1695 
1696  if (_DSProc->trans_params) {
1697  cds_set_definition_lock(_DSProc->trans_params, 0);
1698  cds_delete_group(_DSProc->trans_params);
1699  }
1700 
1701  if (_DSProc->location) dsdb_free_process_location(_DSProc->location);
1702  if (_DSProc->dsc_inputs) dsdb_free_ds_classes(_DSProc->dsc_inputs);
1703  if (_DSProc->dsc_outputs) dsdb_free_ds_classes(_DSProc->dsc_outputs);
1704  if (_DSProc->dsdb) dsdb_destroy(_DSProc->dsdb);
1705  if (_DSProc->dqrdb) dqrdb_destroy(_DSProc->dqrdb);
1706 
1707  if (_DSProc->ndatastreams) {
1708  for (i = 0; i < _DSProc->ndatastreams; i++) {
1709  _dsproc_free_datastream(_DSProc->datastreams[i]);
1710  }
1711  free(_DSProc->datastreams);
1712  }
1713 
1714  free(_DSProc);
1715 
1716  _DSProc = (DSProc *)NULL;
1717  }
1718 
1720  _dsproc_free_exclude_atts();
1721  _dsproc_free_excluded_qc_vars();
1722  _dsproc_free_input_file_list();
1723 }
1724 
1725 /** @publicsection */
1726 
1727 /*******************************************************************************
1728  * Internal Functions Visible To The Public
1729  */
1730 
1731 /**
1732  * Abort the process and exit cleanly.
1733  *
1734  * See convenience macro DSPROC_ABORT.
1735  *
1736  * This function will:
1737  *
1738  * - set the status of the process
1739  * - append the error message to the log file and error mail message
1740  * - call the users finish_process function (but only if dsproc_abort
1741  * is not being called from there)
1742  * - exit the process cleanly
1743  *
1744  * The status string should be a brief one line error message that will
1745  * be used as the process status in the database. This is the message
1746  * that will be displayed in DSView. If the status string is NULL the
1747  * error message specified by the format string and args will be used.
1748  *
1749  * The format string and args will be used to generate the complete and
1750  * more detailed log and error mail messages. If the format string is
1751  * NULL the status string will be used.
1752  *
1753  * @param func - the name of the function sending the message (__func__)
1754  * @param file - the source file the message came from (__FILE__)
1755  * @param line - the line number in the source file (__LINE__)
1756  * @param status - brief error message to use for the process status.
1757  * @param format - format string for full error message (see printf)
1758  * @param ... - arguments for the format string
1759  */
1761  const char *func,
1762  const char *file,
1763  int line,
1764  const char *status,
1765  const char *format, ...)
1766 {
1767  const char *sender = (_DSProc) ? _DSProc->full_name : "null";
1768  int exit_value;
1769  va_list args;
1770 
1771  if (!_DSProc) {
1772  fprintf(stderr,
1773  "dsproc_abort() called outside main processing framework\n");
1774  exit(1);
1775  }
1776 
1777  if (!func) func = "null";
1778  if (!file) file = "null";
1779 
1780  if (format || status) {
1781 
1782  if (!format) format = status;
1783 
1784  va_start(args, format);
1785 
1786  if (status) {
1787  dsproc_set_status(status);
1788  }
1789  else {
1790  status = msngr_format_va_list(format, args);
1791  if (status) {
1792  dsproc_set_status(status);
1793  free((void *)status);
1794  }
1795  else {
1797  }
1798  }
1799 
1800  msngr_vsend(sender, func, file, line, MSNGR_ERROR, format, args);
1801 
1802  va_end(args);
1803  }
1804  else {
1806  "Error message not set in call to dsproc_abbort()\n");
1807 
1808  dsproc_set_status("Unknown Data Processing Error (check logs)");
1809  }
1810 
1811  if (!_InsideFinishProcessHook) {
1812  _dsproc_run_finish_process_hook();
1813  }
1814 
1815  exit_value = dsproc_finish();
1816  exit(exit_value);
1817 }
1818 
1819 /**
1820  * Enable asynchronous processing mode.
1821  *
1822  * Enabling asynchronous processing mode will allow multiple processes to be
1823  * executed concurrently. This will:
1824  *
1825  * - disable the process lock file
1826  * - disable check for chronological data processing
1827  * - disable overlap checks with previously processed data
1828  * - force a new file to be created for every output dataset
1829  */
1831 {
1833  "Enabling Asynchronous Processing Mode:\n"
1834  " - disabling the process lock file\n"
1835  " - disabling check for chronological data processing\n"
1836  " - disabling overlap checks with previously processed data\n"
1837  " - forcing a new file to be created for every output dataset\n");
1838 
1839  _DisableLockFile = 1;
1840  _AsynchronousMode = 1;
1841 }
1842 
1843 /**
1844  * Disable the datasystem process.
1845  *
1846  * This function will set the status message, and cause the process to
1847  * disable itself when it finishes if not running in force mode.
1848  *
1849  * @param message - disable process message
1850  */
1851 void dsproc_disable(const char *message)
1852 {
1853  int force_mode = dsproc_get_force_mode();
1854 
1855  if (!force_mode) {
1856 
1858  "Setting disable process message: '%s'\n", message);
1859 
1860  strncpy((char *)_DSProc->disable, message, 511);
1861  }
1862  else {
1863 
1865  "Setting status to: '%s'\n", message);
1866  }
1867 
1868  strncpy((char *)_DSProc->status, message, 511);
1869 }
1870 
1871 /**
1872  * Disable the database updates.
1873  *
1874  * Disabling database updates will prevent the process from storing
1875  * runtime status information in the database. This can be used to
1876  * run processes that are connected to a read-only database.
1877  */
1879 {
1880  DEBUG_LV1( DSPROC_LIB_NAME, "Disabling database updates\n");
1881 
1882  _DisableDBUpdates = 1;
1883 }
1884 
1885 /**
1886  * Disable the creation of the process lock file.
1887  *
1888  * Warning: Disabling the lock file will allow multiple processes to run
1889  * over the top of themselves and can lead to unpredictable behavior.
1890  */
1892 {
1893  DEBUG_LV1( DSPROC_LIB_NAME, "Disabling lock file\n");
1894 
1895  _DisableLockFile = 1;
1896 }
1897 
1898 /**
1899  * Disable the mail messages.
1900  */
1902 {
1903  DEBUG_LV1( DSPROC_LIB_NAME, "Disabling mail messages\n");
1904 
1905  _DisableMail = 1;
1906 }
1907 
1908 /**
1909  * Get the asynchronous processing mode.
1910  *
1911  * @return
1912  * - 0 = disabled
1913  * - 1 = enabled
1914  *
1915  * @see dsproc_set_asynchronous_mode()
1916  */
1918 {
1919  return(_AsynchronousMode);
1920 }
1921 
1922 /**
1923  * Get the expected data interval.
1924  *
1925  * This is how often we expect to get data to process.
1926  *
1927  * @return expected data interval
1928  */
1930 {
1931  return(_DSProc->data_interval);
1932 }
1933 
1934 /**
1935  * Get the dynamic dods mode.
1936  *
1937  * @return
1938  * - 0 = disabled
1939  * - 1 = enabled
1940  *
1941  * @see dsproc_set_dynamic_dods_mode()
1942  */
1944 {
1945  return(_DynamicDODs);
1946 }
1947 
1948 /**
1949  * Get the force mode.
1950  *
1951  * The force mode can be enabled using the -F option on the command line.
1952  * This mode can be used to force the process past all recoverable errors
1953  * that would normally stop process execution.
1954  *
1955  * @return
1956  * - 0 = disabled
1957  * - 1 = enabled
1958  *
1959  * @see dsproc_set_force_mode()
1960  */
1962 {
1963  return(_Force);
1964 }
1965 
1966 /**
1967  * Get the input directory being used by an Ingest process.
1968  *
1969  * @return
1970  * - input directory
1971  * - NULL if not set
1972  *
1973  * @see dsproc_set_input_dir()
1974  */
1975 const char *dsproc_get_input_dir(void)
1976 {
1977  if (_InputDir[0]) return((const char *)_InputDir);
1978  return((const char *)NULL);
1979 }
1980 
1981 /**
1982  * Get the current input file being processed by an Ingest.
1983  *
1984  * @return
1985  * - input file
1986  * - NULL if not set
1987  *
1988  * @see dsproc_set_input_source()
1989  */
1990 const char *dsproc_get_input_file(void)
1991 {
1992  if (_InputFile[0]) return((const char *)_InputFile);
1993  return((const char *)NULL);
1994 }
1995 
1996 /**
1997  * Get the full path to the input file being processed by an Ingest.
1998  *
1999  * @return
2000  * - full path to the input file
2001  * - NULL if not set
2002  *
2003  * @see dsproc_set_input_source()
2004  */
2005 const char *dsproc_get_input_source(void)
2006 {
2007  if (_InputSource[0]) return((const char *)_InputSource);
2008  return((const char *)NULL);
2009 }
2010 
2011 /**
2012  * Get the process max run time.
2013  *
2014  * @return max run time
2015  */
2017 {
2018  return(_DSProc->max_run_time);
2019 }
2020 
2021 /**
2022  * Get the minimum valid data time for the process.
2023  *
2024  * @return minimum valid data time for the process
2025  */
2027 {
2028  return(_DSProc->min_valid_time);
2029 }
2030 
2031 /**
2032  * Get the begin and end times of the current processing interval.
2033  *
2034  * @param begin - output: processing interval begin time in seconds since 1970.
2035  * @param end - output: processing interval end time in seconds since 1970.
2036  *
2037  * @return length of the data processing interval in seconds
2038  */
2039 time_t dsproc_get_processing_interval(time_t *begin, time_t *end)
2040 {
2041  if (begin) *begin = _DSProc->interval_begin;
2042  if (end) *end = _DSProc->interval_end;
2043  return(_DSProc->proc_interval);
2044 }
2045 
2046 /**
2047  * Get the real time mode.
2048  *
2049  * @return
2050  * - 0 = disabled
2051  * - 1 = enabled
2052  *
2053  * @see dsproc_set_real_time_mode()
2054  */
2056 {
2057  return(_RealTimeMode);
2058 }
2059 
2060 /**
2061  * Get the reprocessing mode.
2062  *
2063  * @return
2064  * - 0 = disabled
2065  * - 1 = enabled
2066  *
2067  * @see dsproc_set_reprocessing_mode()
2068  */
2070 {
2071  return(_Reprocessing);
2072 }
2073 
2074 /**
2075  * Get the process start time.
2076  *
2077  * @return start time
2078  */
2080 {
2081  return(_DSProc->start_time);
2082 }
2083 
2084 /**
2085  * Get the time remaining until the max run time is reached.
2086  *
2087  * If the max run time has been exceeded a message will be added
2088  * to the log file and the process status will be set appropriately.
2089  *
2090  * @return
2091  * - time remaining until the max run time is reached
2092  * - 0 if the max run time has been exceeded
2093  * - -1 if the max run time has not been set
2094  */
2096 {
2097  time_t remaining;
2098  time_t exceeded;
2099 
2101 
2102  /* All data processing loops should call this function, so we
2103  * add the logic to close all open datastream files that were
2104  * not accessed during the previous processing loop here: */
2105 
2107 
2108  if (_DSProc->max_run_time == (time_t)0) {
2109  return(-1);
2110  }
2111 
2112  remaining = _DSProc->start_time
2113  + _DSProc->max_run_time
2114  - time(NULL);
2115 
2116  if (remaining <= 0) {
2117 
2118  exceeded = abs((int)remaining);
2119 
2121  "Exceeded max run time of %d seconds by %d seconds\n",
2122  (int)_DSProc->max_run_time, (int)exceeded);
2123 
2125  return(0);
2126  }
2127 
2129  "Processing time remaining: %d seconds\n", (int)remaining);
2130 
2131  return(remaining);
2132 }
2133 
2134 /**
2135  * Check if the last status was a fatal error.
2136  *
2137  * This function is used to determine if the process should be
2138  * forced to continue if the force_mode is enabled.
2139  *
2140  * @param last_errno last errno value
2141  *
2142  * @return
2143  * - 1 if a fatal error occurred
2144  * (i.e. memory allocation error, disk I/O error, etc...)
2145  * - 0 if the process should attempt to continue.
2146  */
2147 int dsproc_is_fatal(int last_errno)
2148 {
2149  const char *status = dsproc_get_status();
2150  int fi;
2151 
2152  if (!status) {
2153  status = "<null>";
2154  }
2155 
2157  "Checking for fatal system error:\n"
2158  " - dsproc_status: '%s'\n"
2159  " - errno: %d = '%s'\n",
2160  status, last_errno, strerror(last_errno));
2161 
2162  const char *fatal[] = {
2163 // DSPROC_EBADDSID, // "Invalid Datastream ID"
2164 // DSPROC_EBADINDSC, // "Invalid Input Datastream Class"
2165 // DSPROC_EBADOUTDSC, // "Invalid Output Datastream Class"
2166 // DSPROC_EBADOUTFORMAT, // "Invalid Output Datastream Format"
2167  DSPROC_EBADRETRIEVER, // "Invalid Retriever Definition"
2168 // DSPROC_EBASETIME, // "Could Not Get Base Time For Time Variable"
2169 // DSPROC_EBOUNDSVAR, // "Invalid Cell Boundary Variable or Definition"
2170  DSPROC_ECDSALLOCVAR, // "Could Not Allocate Memory For Dataset Variable"
2171 // DSPROC_ECDSCHANGEATT, // "Could Not Change Attribute Value In Dataset"
2172 // DSPROC_ECDSCOPY, // "Could Not Copy Dataset Metadata"
2173 // DSPROC_ECDSDEFVAR, // "Could Not Define Dataset Variable"
2174 // DSPROC_ECDSDELVAR, // "Could Not Delete Dataset Variable"
2175 // DSPROC_ECDSGETTIME, // "Could Not Get Time Values From Dataset"
2176 // DSPROC_ECDSSETATT, // "Could Not Set Attribute Value In Dataset"
2177 // DSPROC_ECDSSETDATA, // "Could Not Set Variable Data In Dataset"
2178 // DSPROC_ECDSSETDIM, // "Could Not Set Dimension Length In Dataset"
2179 // DSPROC_ECDSSETTIME, // "Could Not Set Time Values In Dataset"
2180 // DSPROC_ECLONEVAR, // "Could Not Clone Dataset Variable"
2181 // DSPROC_EDATAATTTYPE, // "Data Attribute Has Invalid Data Type"
2182  DSPROC_EDBCONNECT, // "Database Connection Error"
2183  DSPROC_EDBERROR, // "Database Error (see log file)"
2184  DSPROC_EDESTDIRMAKE, // "Could Not Create Destination Directory"
2185  DSPROC_EDIRLIST, // "Could Not Get Directory Listing"
2186  DSPROC_EDQRDBCONNECT, // "DQR Database Connection Error"
2187  DSPROC_EDQRDBERROR, // "DQR Database Error (see log file)"
2188  DSPROC_EDSPATH, // "Could Not Determine Path To Datastream"
2189  DSPROC_EFILECOPY, // "Could Not Copy File"
2190  DSPROC_EFILEMD5, // "Could Not Get File MD5"
2191  DSPROC_EFILEMOVE, // "Could Not Move File"
2192  DSPROC_EFILEOPEN, // "Could Not Open File"
2193  DSPROC_EFILEREAD // "Could Not Read From File"
2194  DSPROC_EFILEWRITE // "Could Not Write To File"
2195  DSPROC_EFILESTATS, // "Could Not Get File Stats"
2196  DSPROC_EFORCE, // "Could Not Force Process To Continue"
2197  DSPROC_EFORK, // "Could Not Create Fork For New Process"
2198 // DSPROC_EGLOBALATT, // "Invalid Global Attribute Value"
2199 // DSPROC_EINITSIGS, // "Could Not Initialize Signal Handlers"
2200  DSPROC_ELOGOPEN, // "Could Not Open Log File"
2201  DSPROC_ELOGSPATH, // "Could Not Determine Path To Logs Directory"
2202 // DSPROC_EMAILINIT, // "Could Not Initialize Mail"
2203  DSPROC_EMD5CHECK, // "Source And Destination File MD5s Do Not Match"
2204 // DSPROC_EMERGE, // "Could Not Merge Datasets"
2205 // DSPROC_EMINTIME, // "Found Data Time Before Minimum Valid Time"
2206 // DSPROC_ENCCLOSE, // "Could Not Close NetCDF File"
2207  DSPROC_ENCCREATE, // "Could Not Create NetCDF File"
2208  DSPROC_ENCOPEN, // "Could Not Open NetCDF File"
2209  DSPROC_ENCREAD, // "Could Not Read From NetCDF File"
2210  DSPROC_ENCSYNC, // "Could Not Sync NetCDF File"
2211  DSPROC_ENCWRITE, // "Could Not Write To NetCDF File"
2212 // DSPROC_ENODATA, // "No Input Data Found"
2213 // DSPROC_ENOOUTDATA, // "No Output Data Created"
2214  DSPROC_ENODOD, // "DOD Not Defined In Database"
2215 // DSPROC_ENOFILETIME, // "Could Not Determine File Time"
2216 // DSPROC_ENOINDSC, // "Could Not Find Input Datastream Class In Database"
2217  DSPROC_ENOMEM, // "Memory Allocation Error"
2218  DSPROC_ENORETRIEVER, // "Could Not Find Retriever Definition In Database"
2219 // DSPROC_ENOSRCFILE, // "Source File Does Not Exist"
2220 // DSPROC_EPROVOPEN, // "Could Not Open Provenance Log"
2221 // DSPROC_EQCVARDIMS, // "Invalid QC Variable Dimensions"
2222 // DSPROC_EQCVARTYPE, // "Invalid Data Type For QC Variable"
2223 // DSPROC_EREGEX, // "Regular Expression Error"
2224 // DSPROC_EREQVAR, // "Required Variable Missing From Dataset"
2225 // DSPROC_EREQATT // "Required Attribute Variable Missing From Variable or Dataset"
2226 // DSPROC_ERETRIEVER, // "Could Not Retrieve Input Data"
2227 // DSPROC_ERUNTIME, // "Maximum Run Time Limit Exceeded"
2228 // DSPROC_ESAMPLESIZE, // "Invalid Variable Sample Size"
2229 // DSPROC_ETIMECALC, // "Time Calculation Error"
2230 // DSPROC_ETIMEORDER, // "Invalid Time Order"
2231 // DSPROC_ETIMEOVERLAP, // "Found Overlapping Data Times"
2232  DSPROC_ETOOMANYINDSC, // "Too Many Input Datastreams Defined In Database"
2233 // DSPROC_ETRANSFORM, // "Could Not Transform Input Data"
2234  DSPROC_ETRANSPARAMLOAD, // "Could Not Load Transform Parameters File"
2235 // DSPROC_ETRANSQCVAR, // "Could Not Create Consolidated Transformation QC Variable"
2236  DSPROC_EUNLINK, // "Could Not Delete File"
2237 // DSPROC_EVARMAP, // "Could Not Map Input Variable To Output Variable"
2238 // DSPROC_ECSVPARSER, // "Could Not Parse CSV File"
2239  DSPROC_ECSVCONF, // "Could Not Read CSV Ingest Configuration File"
2240 // DSPROC_ECSV2CDS, // "Could Not Map Input CSV Data To Output Dataset"
2241  NULL
2242  };
2243 
2244  for (fi = 0; fatal[fi]; ++fi) {
2245  if (strcmp(status, fatal[fi]) == 0) {
2246 
2248  " - dsproc_status indicates a fatal system error\n");
2249 
2250  return(1);
2251  }
2252  }
2253 
2254  /* Check the last errno variable */
2255 
2256  switch (last_errno) {
2257 
2258  case EPERM: /* Operation not permitted */
2259 // case ENOENT: /* No such file or directory */
2260 // case ESRCH: /* No such process */
2261  case EINTR: /* Interrupted system call */
2262  case EIO: /* I/O error */
2263  case ENXIO: /* No such device or address */
2264 // case E2BIG: /* Argument list too long */
2265 // case ENOEXEC: /* Exec format error */
2266 // case EBADF: /* Bad file number */
2267 // case ECHILD: /* No child processes */
2268 // case EAGAIN: /* Try again */
2269  case ENOMEM: /* Out of memory */
2270  case EACCES: /* Permission denied */
2271 // case EFAULT: /* Bad address */
2272 // case ENOTBLK: /* Block device required */
2273  case EBUSY: /* Device or resource busy */
2274 // case EEXIST: /* File exists */
2275 // case EXDEV: /* Cross-device link */
2276  case ENODEV: /* No such device */
2277 // case ENOTDIR: /* Not a directory */
2278 // case EISDIR: /* Is a directory */
2279 // case EINVAL: /* Invalid argument */
2280 // case ENFILE: /* File table overflow */
2281 // case EMFILE: /* Too many open files */
2282 // case ENOTTY: /* Not a typewriter */
2283  case ETXTBSY: /* Text file busy */
2284 // case EFBIG: /* File too large */
2285  case ENOSPC: /* No space left on device */
2286 // case ESPIPE: /* Illegal seek */
2287  case EROFS: /* Read-only file system */
2288 // case EMLINK: /* Too many links */
2289 // case EPIPE: /* Broken pipe */
2290 // case EDOM: /* Math argument out of domain of func */
2291 // case ERANGE: /* Math result not representable */
2292 
2293 // case EDEADLK: /* Resource deadlock would occur */
2294 // case ENAMETOOLONG: /* File name too long */
2295 // case ENOLCK: /* No record locks available */
2296 // case ENOSYS: /* Function not implemented */
2297 // case ENOTEMPTY: /* Directory not empty */
2298 // case ELOOP: /* Too many symbolic links encountered */
2299 // case EWOULDBLOCK: /* Operation would block */
2300 // case ENOMSG: /* No message of desired type */
2301 // case EIDRM: /* Identifier removed */
2302 // case ECHRNG: /* Channel number out of range */
2303 // case EL2NSYNC: /* Level 2 not synchronized */
2304 // case EL3HLT: /* Level 3 halted */
2305 // case EL3RST: /* Level 3 reset */
2306 // case ELNRNG: /* Link number out of range */
2307 // case EUNATCH: /* Protocol driver not attached */
2308 // case ENOCSI: /* No CSI structure available */
2309 // case EL2HLT: /* Level 2 halted */
2310 // case EBADE: /* Invalid exchange */
2311 // case EBADR: /* Invalid request descriptor */
2312 // case EXFULL: /* Exchange full */
2313 // case ENOANO: /* No anode */
2314 // case EBADRQC: /* Invalid request code */
2315 // case EBADSLT: /* Invalid slot */
2316 
2317 // case EBFONT: /* Bad font file format */
2318 // case ENOSTR: /* Device not a stream */
2319 // case ENODATA: /* No data available */
2320 // case ETIME: /* Timer expired */
2321  case ENOSR: /* Out of streams resources */
2322 // !OSX case ENONET: /* Machine is not on the network */
2323 // !OSX case ENOPKG: /* Package not installed */
2324 // case EREMOTE: /* Object is remote */
2325  case ENOLINK: /* Link has been severed */
2326 // !OSX case EADV: /* Advertise error */
2327 // !OSX case ESRMNT: /* Srmount error */
2328 // !OSX case ECOMM: /* Communication error on send */
2329 // case EPROTO: /* Protocol error */
2330 // case EMULTIHOP: /* Multihop attempted */
2331 // case EDOTDOT: /* RFS specific error */
2332 // case EBADMSG: /* Not a data message */
2333 // case EOVERFLOW: /* Value too large for defined data type */
2334 // case ENOTUNIQ: /* Name not unique on network */
2335 // !OSX case EBADFD: /* File descriptor in bad state */
2336 // case EREMCHG: /* Remote address changed */
2337 // !OSX case ELIBACC: /* Can not access a needed shared library */
2338 // !OSX case ELIBBAD: /* Accessing a corrupted shared library */
2339 // !OSX case ELIBSCN: /* .lib section in a.out corrupted */
2340 // !OSX case ELIBMAX: /* Attempting to link in too many shared libraries */
2341 // !OSX case ELIBEXEC: /* Cannot exec a shared library directly */
2342  case EILSEQ: /* Illegal byte sequence */
2343 // !OSX case ERESTART: /* Interrupted system call should be restarted */
2344 // !OSX case ESTRPIPE: /* Streams pipe error */
2345 // case EUSERS: /* Too many users */
2346  case ENOTSOCK: /* Socket operation on non-socket */
2347 // case EDESTADDRREQ: /* Destination address required */
2348 // case EMSGSIZE: /* Message too long */
2349  case EPROTOTYPE: /* Protocol wrong type for socket */
2350  case ENOPROTOOPT: /* Protocol not available */
2351  case EPROTONOSUPPORT: /* Protocol not supported */
2352  case ESOCKTNOSUPPORT: /* Socket type not supported */
2353  case EOPNOTSUPP: /* Operation not supported on transport endpoint */
2354  case EPFNOSUPPORT: /* Protocol family not supported */
2355  case EAFNOSUPPORT: /* Address family not supported by protocol */
2356  case EADDRINUSE: /* Address already in use */
2357  case EADDRNOTAVAIL: /* Cannot assign requested address */
2358  case ENETDOWN: /* Network is down */
2359  case ENETUNREACH: /* Network is unreachable */
2360  case ENETRESET: /* Network dropped connection because of reset */
2361  case ECONNABORTED: /* Software caused connection abort */
2362  case ECONNRESET: /* Connection reset by peer */
2363  case ENOBUFS: /* No buffer space available */
2364  case EISCONN: /* Transport endpoint is already connected */
2365  case ENOTCONN: /* Transport endpoint is not connected */
2366  case ESHUTDOWN: /* Cannot send after transport endpoint shutdown */
2367  case ETOOMANYREFS: /* Too many references: cannot splice */
2368  case ETIMEDOUT: /* Connection timed out */
2369  case ECONNREFUSED: /* Connection refused */
2370  case EHOSTDOWN: /* Host is down */
2371  case EHOSTUNREACH: /* No route to host */
2372  case EALREADY: /* Operation already in progress */
2373  case EINPROGRESS: /* Operation now in progress */
2374  case ESTALE: /* Stale NFS file handle */
2375 // case EUCLEAN: /* Structure needs cleaning */
2376 // case ENOTNAM: /* Not a XENIX named type file */
2377 // case ENAVAIL: /* No XENIX semaphores available */
2378 // case EISNAM: /* Is a named type file */
2379 // !OSX case EREMOTEIO: /* Remote I/O error */
2380  case EDQUOT: /* Quota exceeded */
2381 
2382 // case ENOMEDIUM: /* No medium found */
2383 // case EMEDIUMTYPE: /* Wrong medium type */
2384  case ECANCELED: /* Operation Canceled */
2385 // !OSX case ENOKEY: /* Required key not available */
2386 // !OSX case EKEYEXPIRED: /* Key has expired */
2387 // !OSX case EKEYREVOKED: /* Key has been revoked */
2388 // !OSX case EKEYREJECTED: /* Key was rejected by service */
2389 
2390 /* for robust mutexes */
2391  case EOWNERDEAD: /* Owner died */
2392  case ENOTRECOVERABLE: /* State not recoverable */
2393 // !OSX case ERFKILL: /* Operation not possible due to RF-kill */
2394 // !OSX case EHWPOISON: /* Memory page has hardware error */
2395 
2397  " - last errno indicates a fatal system error\n");
2398 
2399  return(1);
2400  default:
2401  break;
2402  }
2403 
2405  " - not a fatal system error\n");
2406 
2407  return(0);
2408 }
2409 
2410 /**
2411  * Set Dynamic DODs mode.
2412  *
2413  * If the dynamic dods mode is enabled, the output DODs will be created
2414  * and/or modified using all variables and associated attributes that
2415  * are mapped to it.
2416  *
2417  * @param mode - dynamic dods mode (0 = disabled, 1 = enabled)
2418  *
2419  * @see dsproc_get_dynamic_dods_mode()
2420  */
2422 {
2424  "Setting dynamic DODs mode to: %d\n", mode);
2425 
2426  _DynamicDODs = mode;
2427 }
2428 
2429 /**
2430  * Set the force mode.
2431  *
2432  * The force mode can be enabled using the -F option on the command line.
2433  * This mode can be used to force the process past all recoverable errors
2434  * that would normally stop process execution.
2435  *
2436  * @param mode - force mode (0 = disabled, 1 = enabled)
2437  *
2438  * @see dsproc_get_force_mode()
2439  */
2441 {
2443  "Setting force mode to: %d\n", mode);
2444 
2445  _Force = mode;
2446 }
2447 
2448 /**
2449  * Set the input directory used to create the input_source attribute.
2450  *
2451  * This function is called from the main Ingest files loop to set the
2452  * current input directory being used by the Ingest. When new datasets
2453  * are created this value will be used to populate the input_source
2454  * global attribute value if it is defined in the DOD.
2455  *
2456  * @param input_dir - full path to the input directory
2457  */
2458 void dsproc_set_input_dir(const char *input_dir)
2459 {
2461  "Setting input directory: %s\n", input_dir);
2462 
2463  strncpy(_InputDir, input_dir, PATH_MAX);
2464 }
2465 
2466 /**
2467  * Set the input file used to create the input_source attribute.
2468  *
2469  * This function is called from the main Ingest files loop to set the
2470  * current input file being used by the Ingest. When new datasets
2471  * are created this value will be used to populate the input_source
2472  * global attribute value if it is defined in the DOD.
2473  *
2474  * @param input_file - the name of the input file being processed
2475  */
2476 void dsproc_set_input_source(const char *input_file)
2477 {
2479  "Setting input source: %s/%s\n", _InputDir, input_file);
2480 
2481  strncpy(_InputFile, input_file, PATH_MAX);
2482  snprintf(_InputSource, PATH_MAX, "%s/%s", _InputDir, input_file);
2483 }
2484 
2485 /**
2486  * Set Log file directory.
2487  *
2488  * @param log_dir - full path to the log files directory
2489  *
2490  * @return
2491  * - 1 if successful
2492  * - 0 if a memory allocation error occurred
2493  */
2494 int dsproc_set_log_dir(const char *log_dir)
2495 {
2496  _LogsDir = strdup(log_dir);
2497 
2498  if (!_LogsDir) {
2499 
2501  "Could not set log file directory: %s\n"
2502  " -> memory allocation error\n",
2503  log_dir);
2504 
2506  return(0);
2507  }
2508 
2509  return(1);
2510 }
2511 
2512 /**
2513  * Set the name of the log file to use.
2514  *
2515  * @param log_file - name of the log file
2516  *
2517  * @return
2518  * - 1 if successful
2519  * - 0 if a memory allocation error occurred
2520  */
2521 int dsproc_set_log_file(const char *log_file)
2522 {
2523  _LogFile = strdup(log_file);
2524 
2525  if (!_LogFile) {
2526 
2528  "Could not set log file name: %s\n"
2529  " -> memory allocation error\n",
2530  log_file);
2531 
2533  return(0);
2534  }
2535 
2536  return(1);
2537 }
2538 
2539 /**
2540  * Replace timestamp in log file name with Log ID.
2541  *
2542  * @param log_id - log file ID
2543  *
2544  * @return
2545  * - 1 if successful
2546  * - 0 if a memory allocation error occurred
2547  */
2548 int dsproc_set_log_id(const char *log_id)
2549 {
2550  _LogID = strdup(log_id);
2551 
2552  if (!_LogID) {
2553 
2555  "Could not set log file ID: %s\n"
2556  " -> memory allocation error\n",
2557  log_id);
2558 
2560  return(0);
2561  }
2562 
2563  return(1);
2564 }
2565 
2566 /**
2567  * Set Log file interval.
2568  *
2569  * @param interval - log file interval:
2570  * - LOG_MONTHLY = create monthly log files
2571  * - LOG_DAILY = create daily log files
2572  * - LOG_RUN = create one log file per run
2573  *
2574  * @param use_begin_time - VAP Only: flag indicating if the begin time
2575  * specified on the command line should be used
2576  * for the log file time.
2577  */
2578 void dsproc_set_log_interval(LogInterval interval, int use_begin_time)
2579 {
2580  _LogInterval = interval;
2581  _LogDataTime = use_begin_time;
2582 }
2583 
2584 /**
2585  * Set the maximum runtime allowed for the process.
2586  *
2587  * Calling this function will override the maximum runtime limit set
2588  * in the database.
2589  *
2590  * @param max_runtime - max runtime allowed for the process.
2591  */
2592 void dsproc_set_max_runtime(int max_runtime)
2593 {
2595  "Setting maximum runtime to: %d\n", max_runtime);
2596 
2597  _MaxRunTime = max_runtime;
2598 }
2599 
2600 /**
2601  * Set the begin and end times for the current processing interval.
2602  *
2603  * This function can be used to override the begin and end times
2604  * of the current processing interval and should be called from
2605  * the pre-retrieval hook function.
2606  *
2607  * @param begin_time - begin time in seconds since 1970.
2608  * @param end_time - end time in seconds since 1970.
2609  */
2611  time_t begin_time,
2612  time_t end_time)
2613 {
2614  char ts1[32], ts2[32];
2615 
2616  _DSProc->interval_begin = begin_time;
2617  _DSProc->interval_end = end_time;
2618  _DSProc->proc_interval = end_time - begin_time;
2619 
2621  "Setting processing interval:\n"
2622  " - begin time: %s\n"
2623  " - end time: %s\n"
2624  " - interval: %d seconds\n",
2625  format_secs1970(begin_time, ts1),
2626  format_secs1970(end_time, ts2),
2627  _DSProc->proc_interval);
2628 }
2629 
2630 /**
2631  * Set the offset to apply to the processing interval.
2632  *
2633  * This function can be used to shift the processing interval and
2634  * should be called from either the init-process or pre-retrieval
2635  * hook function.
2636  *
2637  * @param offset - offset in seconds
2638  */
2640 {
2642  "Setting processing interval offset to: %d seconds\n", (int)offset);
2643 
2644  _DSProc->interval_offset = offset;
2645 }
2646 
2647 /**
2648  * Set the reprocessing mode.
2649  *
2650  * If the reprocessing mode is enabled, the time validatation functions will
2651  * not check if the data time is earlier than that of the latest processed
2652  * data time.
2653  *
2654  * @param mode - reprocessing mode (0 = disabled, 1 = enabled)
2655  *
2656  * @see dsproc_get_reprocessing_mode()
2657  */
2659 {
2661  "Setting reprocessing mode to: %d\n", mode);
2662 
2663  _Reprocessing = mode;
2664 }
2665 
2666 /**
2667  * Set the real time mode.
2668  *
2669  * If the real time mode is enabled, the -b option will not be required
2670  * on the command line. Instead the the end of the last processing interval
2671  * will be tracked and used as the start of the next processing interval.
2672  *
2673  * @param mode - real time mode (0 = disabled, 1 = enabled)
2674  * @param max_wait - maximum wait time for new data in hours
2675  *
2676  * @see dsproc_get_reprocessing_mode()
2677  */
2678 void dsproc_set_real_time_mode(int mode, float max_wait)
2679 {
2681  "Setting real time mode to: %d\n"
2682  " -> max wait time = %g hours\n",
2683  mode, max_wait);
2684 
2685  _RealTimeMode = mode;
2686 
2687  if (max_wait >= 0) {
2688  _MaxRealTimeWait = (int)(max_wait * 3600.0);
2689  }
2690 }
2691 
2692 /**
2693  * Initialize a data system process.
2694  *
2695  * This function will:
2696  * - Parse the command line
2697  * - Connect to the database
2698  * - Open the process log file
2699  * - Initialize the mail messages
2700  * - Update the process start time in the database
2701  * - Initialize the signal handlers
2702  * - Define non-standard unit symbols
2703  * - Get process configuration information from database
2704  * - Initialize input and output datastreams
2705  * - Initialize the data retrival structures
2706  *
2707  * The database connection will be left open when this function returns
2708  * to allow the user's init_process() function to access the database without
2709  * the need to reconnect to it. The database connection should be closed
2710  * after the user's init_process() function returns.
2711  *
2712  * The program will terminate inside this function if the -h (help) or
2713  * -v (version) options are specified on the command line (exit value 0),
2714  * or if an error occurs (exit value 1).
2715  *
2716  * @param argc - command line argument count
2717  * @param argv - command line argument vector
2718  * @param proc_model - processing model to use
2719  * @param proc_version - process version
2720  * @param nproc_names - number of valid process names
2721  * @param proc_names - list of valid process names
2722  */
2724  int argc,
2725  char **argv,
2726  ProcModel proc_model,
2727  const char *proc_version,
2728  int nproc_names,
2729  const char **proc_names)
2730 {
2731  const char *program_name = argv[0];
2732  time_t start_time = time(NULL);
2733  const char *site;
2734  const char *facility;
2735  const char *proc_name;
2736  const char *proc_type;
2737  int db_attempts;
2738  FamProc *fam_proc;
2739  char *config_value;
2740  int status;
2741  int exit_value;
2742 
2743  /************************************************************
2744  * Create the DSPROC structure
2745  *************************************************************/
2746 
2747  if (_DSProc) {
2748  dsproc_finish();
2749  }
2750 
2751  _DSProc = (DSProc *)calloc(1, sizeof(DSProc));
2752  if (!_DSProc) {
2753 
2754  fprintf(stderr,
2755  "%s: Memory allocation error initializing process\n",
2756  program_name);
2757 
2758  exit(1);
2759  }
2760 
2761  _DSProc->start_time = start_time;
2762  _DSProc->model = proc_model;
2763 
2764  /* set version */
2765 
2766  if (proc_version) {
2767  if (!(_DSProc->version = strdup(proc_version))) {
2768  goto MEMORY_ERROR;
2769  }
2770  _dsproc_trim_version((char *)_DSProc->version);
2771  }
2772  else {
2773  if (!(_DSProc->version = strdup("Unknown"))) {
2774  goto MEMORY_ERROR;
2775  }
2776  }
2777 
2778  /* set process name if not from the command line */
2779 
2780  if (nproc_names == 1) {
2781  if (!(_DSProc->name = strdup(proc_names[0]))) {
2782  goto MEMORY_ERROR;
2783  }
2784  }
2785 
2786  /************************************************************
2787  * Set process type and parse command line arguments
2788  *************************************************************/
2789 
2790  if (proc_model & DSP_INGEST) {
2791 
2792  if (!(_DSProc->type = strdup("Ingest"))) {
2793  goto MEMORY_ERROR;
2794  }
2795 
2796  if (proc_model & DSP_RETRIEVER ||
2797  proc_model & DSP_TRANSFORM) {
2798 
2799  /* Ingest/VAP Hybrid so set real-time mode
2800  * use VAP parse args */
2801 
2802  if (!dsproc_get_real_time_mode()) {
2804  }
2805 
2806  _dsproc_vap_parse_args(argc, argv, nproc_names, proc_names);
2807  }
2808  else {
2809  _dsproc_ingest_parse_args(argc, argv, nproc_names, proc_names);
2810  }
2811  }
2812  else {
2813  if (!(_DSProc->type = strdup("VAP"))) {
2814  goto MEMORY_ERROR;
2815  }
2816  _dsproc_vap_parse_args(argc, argv, nproc_names, proc_names);
2817  }
2818 
2819  if (!(_DSProc->full_name = msngr_create_string(
2820  "%s-%s", _DSProc->name, _DSProc->type))) {
2821 
2822  goto MEMORY_ERROR;
2823  }
2824 
2825  /************************************************************
2826  * Initialize the process
2827  *************************************************************/
2828 
2829  site = _DSProc->site;
2830  facility = _DSProc->facility;
2831  proc_name = _DSProc->name;
2832  proc_type = _DSProc->type;
2833 
2835  "INITIALIZING PROCESS: %s%s-%s-%s\n",
2836  site, facility, proc_name, proc_type);
2837 
2838  if (!_DSProc->db_alias) {
2839  if (!(_DSProc->db_alias = strdup("dsdb_data"))) {
2840  goto MEMORY_ERROR;
2841  }
2842  }
2843 
2844  /************************************************************
2845  * Create the lockfile for this process
2846  *************************************************************/
2847 
2848  if (!_DisableLockFile) {
2849  if (!_lock_process(site, facility, proc_name, proc_type)) {
2850  _dsproc_destroy();
2851  exit(1);
2852  }
2853  }
2854 
2855  /************************************************************
2856  * Connect to the database
2857  *************************************************************/
2858 
2860  "Initializing database connection: %s\n", _DSProc->db_alias);
2861 
2862  _DSProc->dsdb = dsdb_create(_DSProc->db_alias);
2863  if (!_DSProc->dsdb) {
2864 
2866  "%s%s-%s-%s: Could not initialize database connection\n",
2867  site, facility, proc_name, proc_type);
2868 
2869  _dsproc_destroy();
2870  exit(1);
2871  }
2872 
2873  db_attempts = dsdb_connect(_DSProc->dsdb);
2874 
2875  if (db_attempts == 0) {
2876 
2878  "%s%s-%s-%s: Could not connect to database\n",
2879  site, facility, proc_name, proc_type);
2880 
2881  _dsproc_destroy();
2882  exit(1);
2883  }
2884 
2885  if (msngr_debug_level) {
2886 
2887  if (_DSProc->dsdb->dbconn->db_host[0] != '\0') {
2889  " - db_host: %s\n", _DSProc->dsdb->dbconn->db_host);
2890  }
2891 
2892  if (_DSProc->dsdb->dbconn->db_name[0] != '\0') {
2894  " - db_name: %s\n", _DSProc->dsdb->dbconn->db_name);
2895  }
2896 
2897  if (_DSProc->dsdb->dbconn->db_user[0] != '\0') {
2899  " - db_user: %s\n", _DSProc->dsdb->dbconn->db_user);
2900  }
2901  }
2902 
2903  if (_DSProc->dsdb->dbconn->db_type == DB_WSPC) {
2904 
2906  " - using read-only web service connection\n"
2907  " - disabled database updates\n"
2908  " - disabled mail messages\n");
2909 
2910  _DisableDBUpdates = 1;
2911  _DisableMail = 1;
2912  }
2913 
2914  /************************************************************
2915  * Make sure this is a valid datasystem process
2916  *************************************************************/
2917 
2918  status = dsdb_get_family_process(
2919  _DSProc->dsdb, site, facility, proc_type, proc_name, &fam_proc);
2920 
2921  if (status <= 0) {
2922 
2924  "%s%s-%s-%s: Process not found in database\n",
2925  site, facility, proc_name, proc_type);
2926 
2927  _dsproc_destroy();
2928  exit(1);
2929  }
2930 
2931  dsdb_free_family_process(fam_proc);
2932 
2933  /************************************************************
2934  * Open the provenance log
2935  *************************************************************/
2936 
2937  if (msngr_provenance_level) {
2938 
2939  if (!_init_provenance_log(site, facility, proc_name, proc_type)) {
2940  _dsproc_destroy();
2941  exit(1);
2942  }
2943 
2945  "Initializing process: %s%s-%s-%s\n",
2946  site, facility, proc_name, proc_type);
2947  }
2948 
2950  "Process version: %s\n", _DSProc->version);
2951 
2953  "Library versions:\n"
2954  " - libdsproc3: %s\n"
2955  " - libdsdb3: %s\n"
2956  " - libtrans: %s\n"
2957  " - libcds3: %s\n"
2958  " - libncds3: %s\n"
2959  " - libarmutils: %s\n"
2960  " - libdbconn: %s\n"
2961  " - libmsngr: %s\n",
2963  dsdb_lib_version(),
2964  trans_lib_version(),
2965  cds_lib_version(),
2966  ncds_lib_version(),
2968  dbconn_lib_version(),
2969  msngr_lib_version());
2970 
2971  if (msngr_provenance_level) {
2972 
2973  if (_DSProc->lockfile_path && _DSProc->lockfile_name) {
2974 
2976  "Created process lockfile:\n"
2977  " - path: %s\n"
2978  " - name: %s\n",
2979  _DSProc->lockfile_path,
2980  _DSProc->lockfile_name);
2981  }
2982 
2984  "Using database connection:\n");
2985 
2986  if (_DSProc->dsdb->dbconn->db_host[0] != '\0') {
2988  " - db_host: %s\n", _DSProc->dsdb->dbconn->db_host);
2989  }
2990 
2991  if (_DSProc->dsdb->dbconn->db_name[0] != '\0') {
2993  " - db_name: %s\n", _DSProc->dsdb->dbconn->db_name);
2994  }
2995 
2996  if (_DSProc->dsdb->dbconn->db_user[0] != '\0') {
2998  " - db_user: %s\n", _DSProc->dsdb->dbconn->db_user);
2999  }
3000 
3001  if (_DSProc->dsdb->dbconn->db_type == DB_WSPC) {
3003  " - using read-only web service connection\n"
3004  " - disabled database updates\n"
3005  " - disabled mail messages\n");
3006  }
3007  }
3008 
3009  /************************************************************
3010  * Open the log file
3011  *************************************************************/
3012 
3013  if (!_init_process_log(site, facility, proc_name, proc_type)) {
3014  _dsproc_destroy();
3015  exit(1);
3016  }
3017 
3018  /* Log the number of database connect attempts (if greater than 1) */
3019 
3020  if (db_attempts > 1) {
3021 
3023  "\nDB_ATTEMPTS: It took %d attempts to connect to the database.\n",
3024  db_attempts);
3025  }
3026 
3027  /************************************************************
3028  * After this point the dsproc_finish() function should be
3029  * used to cleanup before exiting.
3030  *************************************************************/
3031 
3032  if (!_dsproc_init()) {
3033  exit_value = dsproc_finish();
3034  exit(exit_value);
3035  }
3036 
3037  if (proc_model == PM_INGEST) {
3038 
3039  /************************************************************
3040  * Initialize an Ingest process
3041  *************************************************************/
3042 
3043  /* Initialize the input datastreams */
3044 
3045  if (!_dsproc_init_input_datastreams()) {
3046  exit_value = dsproc_finish();
3047  exit(exit_value);
3048  }
3049 
3050  /* Initialize the output datastreams */
3051 
3052  if (!_dsproc_init_output_datastreams()) {
3053  exit_value = dsproc_finish();
3054  exit(exit_value);
3055  }
3056  }
3057  else {
3058 
3059  /************************************************************
3060  * Initialize a VAP process
3061  *************************************************************/
3062 
3063  /* Initialize the output datastreams */
3064 
3065  if (!_dsproc_init_output_datastreams()) {
3066  exit_value = dsproc_finish();
3067  exit(exit_value);
3068  }
3069 
3070  /* Initialize the Retriever */
3071 
3072  if (!_dsproc_init_retriever()) {
3073  exit_value = dsproc_finish();
3074  exit(exit_value);
3075  }
3076 
3077  if ((proc_model & DSP_RETRIEVER_REQUIRED) &&
3078  !_DSProc->retriever) {
3079 
3081  "Could not find retriever definition in database\n");
3082 
3084  exit_value = dsproc_finish();
3085  exit(exit_value);
3086  }
3087 
3088  /* Get the data processing interval */
3089 
3090  status = dsproc_get_config_value("processing_interval", &config_value);
3091 
3092  if (status == 1) {
3093  _DSProc->proc_interval = (time_t)atoi(config_value);
3094  free(config_value);
3095 
3096  if (_DSProc->proc_interval <= 0) {
3097  status = 0;
3098  }
3099  }
3100  else if (status < 0) {
3101  exit_value = dsproc_finish();
3102  exit(exit_value);
3103  }
3104 
3105  if (status == 0) {
3106 
3107  if (_DSProc->cmd_line_end > _DSProc->cmd_line_begin) {
3108 
3109  _DSProc->proc_interval =
3110  _DSProc->cmd_line_end - _DSProc->cmd_line_begin;
3111 
3113  "Processing interval not defined or <= 0:\n"
3114  " - using interval between begin and end times specified on command line: %d seconds\n",
3115  _DSProc->proc_interval);
3116  }
3117  else {
3118 
3119  _DSProc->proc_interval = 86400;
3120 
3122  "Processing interval not defined or <= 0:\n"
3123  " - using default value: %d seconds\n",
3124  _DSProc->proc_interval);
3125  }
3126  }
3127 
3128  /* Set the processing period. */
3129 
3130  if (_DSProc->cmd_line_begin) {
3131  _DSProc->period_begin = _DSProc->cmd_line_begin;
3132  }
3133  else {
3134 
3135  status = _set_next_real_time_begin();
3136  if (status <= 0) {
3137  exit_value = dsproc_finish();
3138  exit(exit_value);
3139  }
3140  }
3141 
3142  if (_DSProc->cmd_line_end) {
3143  _DSProc->period_end = _DSProc->cmd_line_end;
3144  }
3145  else if (_DSProc->cmd_line_begin) {
3146  _DSProc->period_end =
3147  _DSProc->cmd_line_begin + _DSProc->proc_interval;
3148  }
3149  else {
3150 
3151  status = _set_next_real_time_end();
3152  if (status <= 0) {
3153  exit_value = dsproc_finish();
3154  exit(exit_value);
3155  }
3156  }
3157  }
3158 
3159  return;
3160 
3161 MEMORY_ERROR:
3162 
3163  fprintf(stderr,
3164  "%s: Memory allocation error initializing process\n",
3165  program_name);
3166 
3167  _dsproc_destroy();
3168  exit(1);
3169 }
3170 
3171 /**
3172  * Start a processing interval loop.
3173  *
3174  * This function will:
3175  * - check if the process has (or will) exceed the maximum run time.
3176  * - determine the begin and end times of the next processing interval.
3177  *
3178  * @param interval_begin - output: begin time of the processing interval
3179  * @param interval_end - output: end time of the processing interval
3180  *
3181  * @return
3182  * - 1 if the next processing interval was returned
3183  * - 0 if processing is complete
3184  */
3186  time_t *interval_begin,
3187  time_t *interval_end)
3188 {
3189  time_t time_remaining;
3190  char begin_string[32];
3191  char end_string[32];
3192  int status;
3193  time_t next_begin_time;
3194  time_t last_begin_time;
3195  char ts1[32], ts2[32];
3196 
3197  *interval_begin = 0;
3198  *interval_end = 0;
3199 
3200  /* Determine the begin time of the next processing interval */
3201 
3202  if (_DSProc->interval_begin == 0) {
3203 
3204  _check_for_obs_loop(); // sets _DSProc->use_obs_loop
3205 
3206  if (!_DSProc->use_obs_loop) {
3207 
3208  /* Adjust processing period for the interval offset
3209  * that may have been set by the user. */
3210 
3211  _DSProc->period_begin += _DSProc->interval_offset;
3212  _DSProc->period_end += _DSProc->interval_offset;
3213 
3214  if (_DSProc->period_end_max) {
3215 
3216  while (_DSProc->period_end > _DSProc->period_end_max) {
3217  _DSProc->period_end -= _DSProc->proc_interval;
3218  }
3219 
3220  if (_DSProc->period_end <= _DSProc->period_begin) {
3221 
3223  "Missing input data for one or more datastreams.\n"
3224  " -> waiting for input data or the maximum wait time of %d hours is reached",
3225  (int)(_MaxRealTimeWait/3600 + 0.5));
3226 
3228  return(0);
3229  }
3230  }
3231  }
3232 
3233  next_begin_time = _DSProc->period_begin;
3234  }
3235  else {
3236  next_begin_time = _DSProc->interval_end;
3237  }
3238 
3239  if (!_DSProc->cmd_line_begin) {
3240 
3241  // A begin time was not specified on the command line so
3242  // we are running in "real time" mode.
3243 
3244  if (!_update_next_begin_time_file(next_begin_time)) {
3245  return(0);
3246  }
3247  }
3248  else {
3249 
3250  // Check if a next_begin_time file exists and update it if the
3251  // current begin time is greater than the time in the file.
3252 
3253  status = _read_next_begin_time_file(&last_begin_time);
3254  if (status < 0) return(0);
3255  if (status > 0 && next_begin_time > last_begin_time) {
3256 
3257  if (!_update_next_begin_time_file(next_begin_time)) {
3258  return(0);
3259  }
3260  }
3261  }
3262 
3263  /* Set process interval begin and end times */
3264 
3265  if (_DSProc->use_obs_loop) {
3266 
3267  status = _set_next_obs_loop_interval(next_begin_time);
3268  if (status <= 0) {
3269 
3270  if (status == 0 && (next_begin_time == _DSProc->period_begin)) {
3271 
3272  format_secs1970(next_begin_time, ts1);
3273 
3275  "\nNo data found after: %s\n",
3276  ts1);
3277 
3279  }
3280 
3281  return(0);
3282  }
3283 
3284  if (_DSProc->interval_begin > _DSProc->period_end) {
3285 
3286  if (next_begin_time == _DSProc->period_begin) {
3287 
3288  format_secs1970(_DSProc->period_begin, ts1);
3289  format_secs1970(_DSProc->period_end, ts2);
3290 
3292  "\nNo data found from '%s' to '%s'\n",
3293  ts1, ts2);
3294 
3296  }
3297 
3298  return(0);
3299  }
3300  }
3301  else {
3302  _DSProc->interval_begin = next_begin_time;
3303 
3304  /* Determine the end time of the next processing interval */
3305 
3306  _DSProc->interval_end = _DSProc->interval_begin
3307  + _DSProc->proc_interval;
3308 
3309  if (_DSProc->interval_end > _DSProc->period_end) {
3310 
3311  if (_DSProc->interval_begin == _DSProc->period_begin) {
3312  _DSProc->interval_end = _DSProc->period_end;
3313  }
3314  else {
3315  return(0);
3316  }
3317  }
3318  }
3319 
3320  *interval_begin = _DSProc->interval_begin;
3321  *interval_end = _DSProc->interval_end;
3322 
3323  /* Check the run time */
3324 
3325  if (_DSProc->loop_begin != 0) {
3326  _DSProc->loop_end = time(NULL);
3327  }
3328 
3329  time_remaining = dsproc_get_time_remaining();
3330 
3331  if (time_remaining >= 0) {
3332 
3333  if (time_remaining == 0) {
3334  return(0);
3335  }
3336  else if ((_DSProc->loop_end - _DSProc->loop_begin) > time_remaining) {
3337 
3339  "\nStopping vap before max run time of %d seconds is exceeded\n",
3340  (int)dsproc_get_max_run_time());
3341 
3343  return(0);
3344  }
3345  }
3346 
3347  _DSProc->loop_begin = time(NULL);
3348 
3349  /* Print debug and log messages */
3350 
3351  format_secs1970(*interval_begin, begin_string);
3352  format_secs1970(*interval_end, end_string);
3353 
3355  "PROCESSING DATA:\n"
3356  " - from: %s\n"
3357  " - to: %s\n",
3358  begin_string, end_string);
3359 
3361  "\nProcessing data: %s -> %s\n",
3362  begin_string, end_string);
3363 
3364  /* Update all datastream DODs for the current processing interval */
3365 
3366  if (!dsproc_update_datastream_dsdods(*interval_begin)) {
3367  return(0);
3368  }
3369 
3370  return(1);
3371 }
3372 
3373 /**
3374  * Finish a data system process.
3375  *
3376  * This function will:
3377  *
3378  * - Update the process status in the database
3379  * - Log all process stats that were recorded
3380  * - Disconnect from the database
3381  * - Mail all messages that were generated
3382  * - Close the process log file
3383  * - Free all memory used by the internal DSProc structure
3384  *
3385  * @return suggested program exit value (0 = success, 1 = failure)
3386  *
3387  * @see dsproc_init()
3388  */
3389 int dsproc_finish(void)
3390 {
3391  ProcStatus *proc_status = (ProcStatus *)NULL;
3392  char *last_status_text = (char *)NULL;
3393  time_t last_successful = 0;
3394  time_t last_completed = 0;
3395  int no_data_found = 0;
3396  int no_data_ok = 0;
3397  int total_in_records = 0;
3398  int total_records = 0;
3399  int total_files = 0;
3400  int last_errno = errno;
3401  int successful;
3402  const char *status_name;
3403  const char *status_text;
3404  char *status_message;
3405  char status_note[128];
3406  time_t delta_t;
3407  time_t finish_time;
3408  char finish_time_string[32];
3409  int mail_error_status;
3410  DataStream *ds;
3411  int dsi;
3412  char *hostname;
3413  char time_string1[32];
3414  char time_string2[32];
3415  int exit_value;
3416 
3418 
3420  "EXITING PROCESS\n");
3421 
3422  /************************************************************
3423  * Log output data and file stats
3424  *************************************************************/
3425 
3426  for (dsi = 0; dsi < _DSProc->ndatastreams; dsi++) {
3427 
3428  ds = _DSProc->datastreams[dsi];
3429 
3430  if (ds->role == DSR_INPUT &&
3431  ds->total_records &&
3432  _DSProc->retriever) {
3433 
3434  format_timeval(&(ds->begin_time), time_string1);
3435 
3436  if (ds->end_time.tv_sec) {
3437  format_timeval(&(ds->end_time), time_string2);
3438  }
3439  else {
3440  strcpy(time_string2, "none");
3441  }
3442 
3444  "\n"
3445  "Datastream Stats: %s\n"
3446  " - begin time: %s\n"
3447  " - end time: %s\n"
3448  " - total records: %d\n",
3449  ds->name, time_string1, time_string2,
3450  ds->total_records);
3451 
3452  total_in_records += ds->total_records;
3453 
3454  continue;
3455  }
3456 
3457  if (ds->role == DSR_OUTPUT &&
3458  ds->begin_time.tv_sec) {
3459 
3460  format_timeval(&(ds->begin_time), time_string1);
3461 
3462  if (ds->end_time.tv_sec) {
3463  format_timeval(&(ds->end_time), time_string2);
3464  }
3465  else {
3466  strcpy(time_string2, "none");
3467  }
3468 
3470  "\n"
3471  "Datastream Stats: %s\n"
3472  " - begin time: %s\n"
3473  " - end time: %s\n",
3474  ds->name, time_string1, time_string2);
3475 
3476  if (ds->total_files) {
3477 
3479  " - total files: %d\n"
3480  " - total bytes: %lu\n",
3481  ds->total_files, (unsigned long)ds->total_bytes);
3482 
3483  total_files += ds->total_files;
3484  }
3485 
3486  if (ds->total_records) {
3487 
3489  " - total records: %d\n",
3490  ds->total_records);
3491 
3492  total_records += ds->total_records;
3493  }
3494 
3495  continue;
3496  }
3497  }
3498 
3499  /************************************************************
3500  * Set status_name and status_text values
3501  *************************************************************/
3502 
3503  status_text = _DSProc->status;
3504  if (status_text[0] == '\0') {
3505 
3506  if (total_files || total_records) {
3507  status_text = DSPROC_SUCCESS;
3508  }
3509  else if (_DSProc->retriever && !total_in_records) {
3510  status_text = DSPROC_ENODATA;
3511  }
3512  else {
3513  status_text = DSPROC_ENOOUTDATA;
3514  }
3515  }
3516 
3517  if (strcmp(status_text, DSPROC_SUCCESS) == 0) {
3518  status_name = "Success";
3519  successful = 1;
3520  }
3521  else if (strcmp(status_text, DSPROC_ENODATA) == 0) {
3522  status_name = "NoDataFound";
3523  successful = 0;
3524  no_data_found = 1;
3525  }
3526  else if (strcmp(status_text, DSPROC_ENOOUTDATA) == 0) {
3527  status_name = "NoDataFound";
3528  successful = 0;
3529  no_data_found = 1;
3530  }
3531  else if (strcmp(status_text, DSPROC_ERUNTIME) == 0) {
3532  status_name = "MaxRuntimeExceeded";
3533  successful = 0;
3534  }
3535  else {
3536  status_name = "Failure";
3537  successful = 0;
3538  }
3539 
3540  status_note[0] = '\0';
3541 
3542  /************************************************************
3543  * Set the process status in the database
3544  *************************************************************/
3545 
3546  finish_time = time(NULL);
3547 
3548  if (!_DisableDBUpdates) {
3549 
3550  if (dsproc_db_connect()) {
3551 
3553  "Updating process status in database\n");
3554 
3555  /* Check if we need to disable the process */
3556 
3557  if (_DSProc->disable[0] != '\0') {
3558 
3560  "Disabling Process: %s\n", _DSProc->disable);
3561 
3562  finish_time = time(NULL);
3563 
3565  _DSProc->site, _DSProc->facility, _DSProc->type, _DSProc->name,
3566  "AutoDisabled", _DSProc->disable, finish_time);
3567  }
3568 
3569  /* Get the status of the last run */
3570 
3572  _DSProc->site, _DSProc->facility, _DSProc->type, _DSProc->name,
3573  &proc_status);
3574 
3575  if (proc_status) {
3576  last_status_text = proc_status->text;
3577  last_successful = proc_status->last_successful;
3578  last_completed = proc_status->last_completed;
3579  }
3580 
3581  /* Update the status in the database...
3582  *
3583  * We do not want to update the status in the database if no
3584  * input data was found and the data expectation interval is
3585  * greater than the difference between the process start time
3586  * and the last successful time.
3587  */
3588 
3589  if (no_data_found) {
3590 
3591  delta_t = _DSProc->start_time - last_successful;
3592 
3593  if (_DSProc->data_interval > delta_t) {
3594 
3595  status_name = "Success";
3596  status_text = DSPROC_SUCCESS;
3597  no_data_ok = 1;
3598  successful = 1;
3599 
3600  snprintf(status_note, 128,
3601  " -> No input data was found but we are within\n"
3602  " -> the data expectation interval of %g hours.\n",
3603  (double)_DSProc->data_interval/3600);
3604  }
3605  }
3606 
3607  finish_time = time(NULL);
3608 
3609  if (no_data_ok) {
3611  _DSProc->site, _DSProc->facility, _DSProc->type, _DSProc->name,
3612  finish_time);
3613  }
3614  else {
3616  _DSProc->site, _DSProc->facility, _DSProc->type, _DSProc->name,
3617  status_name, status_text, finish_time);
3618  }
3619 
3620  /* Store any updated datastream times */
3621 
3622  _dsproc_store_output_datastream_times();
3623 
3624  /* Close database connection */
3625 
3627  }
3628  else {
3629 
3631  "Could not update process status in database:\n"
3632  " -> database connect error\n");
3633 
3634  snprintf(status_note, 128,
3635  " -> Could not update status in database\n");
3636  }
3637  }
3638 
3639  /************************************************************
3640  * Create the status message
3641  *************************************************************/
3642 
3643  hostname = dsenv_get_hostname();
3644  if (!hostname) {
3645  hostname = "unknown";
3646  }
3647 
3648  format_secs1970(finish_time, finish_time_string);
3649 
3650  status_message = msngr_create_string(
3651  "Current Status (%s):\n"
3652  "Process: %s%s-%s-%s\n"
3653  "Version: %s\n"
3654  "Host: %s\n"
3655  "Status: %s\n"
3656  "%s",
3657  finish_time_string,
3658  _DSProc->site, _DSProc->facility, _DSProc->name, _DSProc->type,
3659  _DSProc->version, hostname, status_text, status_note);
3660 
3661  if (!status_message) {
3662 
3664  "Could not create status message\n"
3665  " -> memory allocation error\n");
3666  }
3667 
3668  /************************************************************
3669  * Add process status to the mail messages
3670  *************************************************************/
3671 
3672  if (!_DisableMail) {
3673 
3675  "Adding process status to mail messages\n");
3676 
3677  /* Error Mail */
3678 
3679  mail_error_status = 1;
3680 
3681  if (successful) {
3682  mail_error_status = 0;
3683  }
3684  else if (last_status_text) {
3685  if (strcmp(last_status_text, status_text) == 0) {
3686  mail_error_status = 0;
3687  }
3688  }
3689 
3690  _finish_mail(MSNGR_ERROR, mail_error_status, status_message,
3691  last_status_text, last_completed, last_successful, finish_time_string);
3692 
3693  /* Warning Mail */
3694 
3695  _finish_mail(MSNGR_WARNING, 0, status_message,
3696  last_status_text, last_completed, last_successful, finish_time_string);
3697 
3698  /* Maintainer Mail */
3699 
3700  _finish_mail(MSNGR_MAINTAINER, 0, status_message,
3701  last_status_text, last_completed, last_successful, finish_time_string);
3702  }
3703 
3704  /************************************************************
3705  * Add process status to the log file
3706  *************************************************************/
3707 
3709  "Adding process status to log file\n");
3710 
3711  if (status_message) {
3712 
3714  "\n%s", status_message);
3715 
3716  free(status_message);
3717  }
3718 
3719  /************************************************************
3720  * Send the mail and close the log file
3721  *************************************************************/
3722 
3723  msngr_finish();
3724 
3725  /************************************************************
3726  * Set suggested program exit value
3727  *************************************************************/
3728 
3729  if (successful) {
3730  exit_value = 0;
3731  }
3732  else if (dsproc_is_fatal(last_errno)) {
3733  exit_value = 2;
3734  }
3735  else {
3736  exit_value = 1;
3737  }
3738 
3739  /************************************************************
3740  * Free the memory
3741  *************************************************************/
3742 
3743  if (proc_status) {
3744  dsdb_free_process_status(proc_status);
3745  }
3746 
3747  _dsproc_destroy();
3748 
3749  /************************************************************
3750  * Return suggested exit value
3751  *************************************************************/
3752 
3754 
3755  if (successful) {
3757  "Suggested exit value: %d (successful)\n", exit_value);
3758  }
3759  else {
3761  "Suggested exit value: %d (failure)\n", exit_value);
3762  }
3763  }
3764 
3765  return(exit_value);
3766 }
3767 
3768 /************************************************************
3769  * Print the contents of the internal DSProc structure.
3770  *
3771  * @param fp - pointer to the output file
3772  */
3773 /*
3774 
3775 BDE: None of this has been updated in a looooong time, and a lot has been
3776 added. I leave it here in the off chance I will want/need it again...
3777 
3778 void dsproc_print_info(FILE *fp)
3779 {
3780  DSClass *dsc;
3781  DSDOD *dsdod;
3782  DataTimes *data_times;
3783  LogFile *log;
3784  Mail *mail;
3785  char time_string1[32];
3786  char time_string2[32];
3787  int i;
3788 
3789  if (!_DSProc) {
3790  fprintf(fp,
3791  "Datasystem process has not been initialized.\n");
3792  }
3793 
3794  if (_DSProc->dsdb && _DSProc->dsdb->dbconn) {
3795  fprintf(fp,
3796  "\n"
3797  "Database Host: %s\n"
3798  "Database Name: %s\n"
3799  "Database User: %s\n",
3800  _DSProc->dsdb->dbconn->db_host,
3801  _DSProc->dsdb->dbconn->db_name,
3802  _DSProc->dsdb->dbconn->db_user);
3803  }
3804 
3805  fprintf(fp,
3806  "\n"
3807  "Site: %s\n"
3808  "Facility: %s\n"
3809  "Process Name: %s\n"
3810  "Process Type: %s\n"
3811  "Process Version: %s\n",
3812  _DSProc->site, _DSProc->facility, _DSProc->name, _DSProc->type,
3813  _DSProc->version);
3814 
3815  format_secs1970(_DSProc->start_time, time_string1);
3816  format_secs1970(_DSProc->min_valid_time, time_string2);
3817 
3818  fprintf(fp,
3819  "\n"
3820  "Start Time: %s\n"
3821  "Max Run Time: %d seconds\n"
3822  "Min Valid Time: %s\n",
3823  time_string1, (int)_DSProc->max_run_time, time_string2);
3824 
3825  if (_DSProc->location) {
3826  fprintf(fp,
3827  "\n"
3828  "Location: %s\n"
3829  "Latitude: %g N\n"
3830  "Longitude: %g E\n"
3831  "Altitude: %g MSL\n",
3832  _DSProc->location->name,
3833  _DSProc->location->lat,
3834  _DSProc->location->lon,
3835  _DSProc->location->alt);
3836  }
3837 
3838  log = msngr_get_log_file();
3839 
3840  if (log) {
3841  fprintf(fp,
3842  "\n"
3843  "Log File Path: %s\n"
3844  "Log File Name: %s\n",
3845  log->path, log->name);
3846  }
3847 
3848  mail = msngr_get_mail(MSNGR_ERROR);
3849  if (mail) {
3850  fprintf(fp,
3851  "\n"
3852  "Error Mail: %s\n", mail->to);
3853  }
3854 
3855  mail = msngr_get_mail(MSNGR_WARNING);
3856  if (mail) {
3857  fprintf(fp,
3858  "Warning Mail: %s\n", mail->to);
3859  }
3860 
3861  mail = msngr_get_mail(MSNGR_MAINTAINER);
3862  if (mail) {
3863  fprintf(fp,
3864  "Mentor Mail: %s\n", mail->to);
3865  }
3866 
3867  if (_DSProc->data_interval) {
3868  fprintf(fp,
3869  "\n"
3870  "Expected Data Interval: %d seconds\n", (int)_DSProc->data_interval);
3871  }
3872 
3873  if (_DSProc->proc_interval) {
3874  fprintf(fp,
3875  "\n"
3876  "Data Processing Interval: %d seconds\n", (int)_DSProc->proc_interval);
3877  }
3878 
3879  if (_DSProc->ndsc_inputs) {
3880  fprintf(fp,
3881  "\n"
3882  "Input Datastream Classes:\n"
3883  "\n");
3884 
3885  for (i = 0; i < _DSProc->ndsc_inputs; i++) {
3886  dsc = _DSProc->dsc_inputs[i];
3887  fprintf(fp, " %s.%s\n", dsc->name, dsc->level);
3888  }
3889  }
3890 
3891  if (_DSProc->ndsc_outputs) {
3892  fprintf(fp,
3893  "\n"
3894  "Output Datastream Classes:\n"
3895  "\n");
3896 
3897  for (i = 0; i < _DSProc->ndsc_outputs; i++) {
3898  dsc = _DSProc->dsc_outputs[i];
3899  fprintf(fp, " %s.%s\n", dsc->name, dsc->level);
3900  }
3901  }
3902 
3903  if (_DSProc->ndsdods) {
3904  fprintf(fp,
3905  "\n"
3906  "Loaded Datastream DODs:\n"
3907  "\n");
3908 
3909  for (i = 0; i < _DSProc->ndsdods; i++) {
3910  dsdod = _DSProc->dsdods[i];
3911 
3912  fprintf(fp,
3913  " %s%s%s.%s-%s\n",
3914  _DSProc->site, dsdod->name, _DSProc->facility, dsdod->level,
3915  dsdod->version);
3916  }
3917  }
3918 
3919  if (_DSProc->ndatatimes) {
3920  fprintf(fp,
3921  "\n"
3922  "Previously Processed Data Times:\n");
3923 
3924  for (i = 0; i < _DSProc->ndatatimes; i++) {
3925 
3926  data_times = _DSProc->data_times[i];
3927 
3928  if (data_times->begin_time.tv_sec) {
3929  format_timeval(&data_times->begin_time, time_string1);
3930  }
3931  else {
3932  strcpy(time_string1, "N/A");
3933  }
3934 
3935  if (data_times->end_time.tv_sec) {
3936  format_timeval(&data_times->end_time, time_string2);
3937  }
3938  else {
3939  strcpy(time_string2, "N/A");
3940  }
3941 
3942  fprintf(fp,
3943  "\n"
3944  " %s%s%s.%s\n"
3945  " - begin time: %s\n"
3946  " - end time: %s\n",
3947  _DSProc->site, data_times->dsc_name, _DSProc->facility,
3948  data_times->dsc_level,
3949  time_string1,
3950  time_string2);
3951  }
3952  }
3953 
3954  if (_DSProc->input_dir) {
3955  fprintf(fp,
3956  "\n"
3957  "Input Directory: %s\n", _DSProc->input_dir);
3958  }
3959 
3960  if (_DSProc->file_patterns) {
3961  fprintf(fp,
3962  "\n"
3963  "File Patterns:\n"
3964  "\n");
3965 
3966  for (i = 0; i < _DSProc->npatterns; i++) {
3967  fprintf(fp,
3968  " %s\n", _DSProc->file_patterns[i]);
3969  }
3970  }
3971 }
3972 */
3973 
3974 /*******************************************************************************
3975  * Public Functions
3976  */
3977 /** @publicsection */
3978 
3979 /**
3980  * Get the process status.
3981  *
3982  * @return process status message
3983  */
3984 const char *dsproc_get_status(void)
3985 {
3986  return(_DSProc->status);
3987 }
3988 
3989 /**
3990  * Set the process status.
3991  *
3992  * @param status - process status message
3993  */
3994 void dsproc_set_status(const char *status)
3995 {
3996  if (status) {
3998  "Setting status to: '%s'\n", status);
3999 
4000  strncpy((char *)_DSProc->status, status, 511);
4001  }
4002  else {
4004  "Clearing last status string\n");
4005 
4006  strcpy((char *)_DSProc->status, "");
4007  }
4008 }
4009 
4010 /**
4011  * Get the process site.
4012  *
4013  * @return site name
4014  */
4015 const char *dsproc_get_site(void)
4016 {
4017  return(_DSProc->site);
4018 }
4019 
4020 /**
4021  * Get the process facility.
4022  *
4023  * @return facility
4024  */
4025 const char *dsproc_get_facility(void)
4026 {
4027  return(_DSProc->facility);
4028 }
4029 
4030 /**
4031  * Get the process name.
4032  *
4033  * @return process name
4034  */
4035 const char *dsproc_get_name(void)
4036 {
4037  return(_DSProc->name);
4038 }
4039 
4040 /**
4041  * Get the process type.
4042  *
4043  * @return process type
4044  */
4045 const char *dsproc_get_type(void)
4046 {
4047  return(_DSProc->type);
4048 }
4049 
4050 /**
4051  * Get the process version.
4052  *
4053  * @return process version
4054  */
4055 const char *dsproc_get_version(void)
4056 {
4057  return(_DSProc->version);
4058 }