30 #include <sys/resource.h>
40 #define COREDUMPSIZE 12000000
55 static char *_LogsRoot = (
char *)NULL;
56 static char *_LogsDir = (
char *)NULL;
57 static char *_LogFile = (
char *)NULL;
58 static char *_LogID = (
char *)NULL;
59 static int _Reprocessing = 0;
60 static int _DynamicDODs = 0;
61 static int _Force = 0;
62 static int _LogInterval = 0;
63 static int _LogDataTime = 0;
65 static char _InputDir[PATH_MAX];
66 static char _InputFile[PATH_MAX];
67 static char _InputSource[PATH_MAX];
69 static int _RealTimeMode = 0;
71 static int _MaxRunTime = -1;
73 static int _AsynchronousMode = 0;
76 static time_t _MaxRealTimeWait = 3 * 86400;
82 static char *_get_logs_root()
94 else if (status == 0) {
97 "Could not get path to logs directory\n"
98 " -> the LOGS_DATA environment variable was not found\n");
101 return((
char *)NULL);
108 static int _lock_process(
110 const char *facility,
111 const char *proc_name,
112 const char *proc_type)
123 "Creating process lockfile:\n");
128 _DSProc->lockfile_path = strdup(_LogsDir);
134 lockfile_root = _get_logs_root();
135 if (!lockfile_root) {
143 lockfile_root, site);
149 "%s%s-%s-%s: Could not create lockfile\n"
150 " -> memory allocation error\n",
151 site, facility, proc_name, proc_type);
159 site, facility, proc_name, proc_type);
164 "%s%s-%s-%s: Could not create lockfile\n"
165 " -> memory allocation error\n",
166 site, facility, proc_name, proc_type);
170 _DSProc->lockfile_path = (
char *)NULL;
182 MAX_LOCKFILE_ERROR, errstr);
188 site, facility, proc_name, proc_type, errstr);
193 _DSProc->lockfile_path = (
char *)NULL;
194 _DSProc->lockfile_name = (
char *)NULL;
202 "%s%s-%s-%s: Removed stale lockfile\n"
204 site, facility, proc_name, proc_type,
211 static void _unlock_process(
void)
221 "Removing process lockfile:\n"
228 MAX_LOCKFILE_ERROR, errstr);
241 _DSProc->lockfile_path = (
char *)NULL;
242 _DSProc->lockfile_name = (
char *)NULL;
245 static int _init_process_log(
247 const char *facility,
248 const char *proc_name,
249 const char *proc_type)
259 memset(&gmt, 0,
sizeof(
struct tm));
262 "Opening process log file:\n");
267 log_path = strdup(_LogsDir);
273 logs_root = _get_logs_root();
281 logs_root, site, site, proc_name, facility);
287 "%s%s-%s-%s: Could not open process log\n"
288 " -> memory allocation error\n",
289 site, facility, proc_name, proc_type);
296 log_name = strdup(_LogFile);
300 site, proc_name, facility, _LogID, proc_type);
305 if (_LogDataTime &&
_DSProc->cmd_line_begin) {
306 log_time =
_DSProc->cmd_line_begin;
309 log_time = time(NULL);
312 gmtime_r(&log_time, &gmt);
316 switch (_LogInterval) {
319 site, proc_name, facility,
320 (gmt.tm_year + 1900), (gmt.tm_mon + 1), gmt.tm_mday, proc_type);
324 site, proc_name, facility,
325 (gmt.tm_year + 1900), (gmt.tm_mon + 1), gmt.tm_mday,
326 gmt.tm_hour, gmt.tm_min, gmt.tm_sec, proc_type);
330 site, proc_name, facility,
331 (gmt.tm_year + 1900), (gmt.tm_mon + 1), proc_type);
338 "%s%s-%s-%s: Could not open process log\n"
339 " -> memory allocation error\n",
340 site, facility, proc_name, proc_type);
353 log_path, log_name, (LOG_TAGS | LOG_STATS), MAX_LOG_ERROR, errstr);
361 "%s%s-%s-%s: Could not open process log\n"
363 site, facility, proc_name, proc_type, errstr);
372 static int _init_provenance_log(
374 const char *facility,
375 const char *proc_name,
376 const char *proc_type)
386 memset(&gmt, 0,
sizeof(
struct tm));
389 "Opening provenance log for: %s%s-%s-%s\n",
390 site, facility, proc_name, proc_type);
395 log_path = strdup(_LogsDir);
401 logs_root = _get_logs_root();
409 logs_root, site, site, proc_name, facility);
415 "%s%s-%s-%s: Could not open provenance log\n"
416 " -> memory allocation error\n",
417 site, facility, proc_name, proc_type);
428 site, proc_name, facility, _LogID, proc_type);
433 if (_LogDataTime &&
_DSProc->cmd_line_begin) {
434 log_time =
_DSProc->cmd_line_begin;
437 log_time = time(NULL);
440 gmtime_r(&log_time, &gmt);
445 site, proc_name, facility,
446 (gmt.tm_year + 1900), (gmt.tm_mon + 1), gmt.tm_mday,
447 gmt.tm_hour, gmt.tm_min, gmt.tm_sec, proc_type);
454 "%s%s-%s-%s: Could not open provenance log\n"
455 " -> memory allocation error\n",
456 site, facility, proc_name, proc_type);
469 log_path, log_name, (LOG_TAGS | LOG_STATS), MAX_LOG_ERROR, errstr);
477 "%s%s-%s-%s: Could not open provenance log\n"
479 site, facility, proc_name, proc_type, errstr);
488 static int _init_mail(
492 const char *config_key)
497 size_t mail_to_length;
506 "Checking database for '%s' custodians\n", config_key);
512 config_key, &proc_conf);
520 for (i = 0; proc_conf[i]; i++) {
521 mail_to_length += strlen(proc_conf[i]->value) + 2;
526 mail_to = (
char *)calloc(mail_to_length,
sizeof(
char));
530 "Could not initialize mail message for: %s\n"
531 " -> memory allocation error\n", config_key);
539 for (i = 0; proc_conf[i]; i++) {
543 if (i) strcat(mail_to,
",");
545 strcat(mail_to, proc_conf[i]->value);
568 "Could not initialize mail message for: %s\n"
569 " -> %s\n", config_key, errstr);
575 else if (status < 0) {
586 static void _finish_mail(
588 int mail_error_status,
589 char *status_message,
590 char *last_status_text,
591 time_t last_completed,
592 time_t last_successful,
593 char *finish_time_string)
596 char last_completed_string[32];
597 char last_successful_string[32];
608 if (mail->
body[0] !=
'\0' || mail_error_status) {
610 if (last_status_text) {
615 if (status_message && last_status_text) {
619 "Last Completed: %s\n"
620 "Last Successful: %s\n",
623 last_completed_string,
624 last_successful_string);
626 else if (status_message) {
629 "No Previous Status Has Been Recorded\n",
632 else if (last_status_text) {
634 "Current Status: %s\n"
635 "Status: Memory allocation error creating status message\n"
638 "Last Completed: %s\n"
639 "Last Successful: %s\n",
642 last_completed_string,
643 last_successful_string);
647 "Current Status: %s\n"
648 "Status: Memory allocation error creating status message\n"
650 "No Previous Status Has Been Recorded\n",
656 static void _signal_handler(
int sig, siginfo_t *si,
void *uc)
667 status =
"SIGQUIT: Quit (see termio(7I))";
671 status =
"SIGILL: Illegal Instruction";
674 status =
"SIGTRAP: Trace or Breakpoint Trap";
678 status =
"SIGABRT: Abort";
682 status =
"SIGEMT: Emulation Trap";
686 status =
"SIGFPE: Arithmetic Exception";
689 status =
"SIGBUS: Bus Error";
692 status =
"SIGSEGV: Segmentation Fault";
695 status =
"SIGSYS: Bad System Call";
698 status =
"SIGHUP: Hangup (see termio(7I))";
702 status =
"SIGINT: Interrupt (see termio(7I))";
706 status =
"SIGPIPE: Broken Pipe";
710 status =
"SIGALRM: Alarm Clock";
714 status =
"SIGTERM: Terminated";
718 status =
"Trapped Unknown Signal Type";
722 "Received Signal: %s\n", status);
727 if (msngr_debug_level) {
728 printstack(fileno(stdout));
743 if (input_dir && input_file) {
750 _dsproc_run_finish_process_hook();
756 static int _init_signal_handlers(
void)
758 struct sigaction act;
763 memset(&act, 0,
sizeof(act));
766 "Initializing signal handlers\n");
770 act.sa_flags = (SA_SIGINFO);
771 act.sa_sigaction = _signal_handler;
773 if (sigaction(SIGHUP, &act, 0) != 0 ||
774 sigaction(SIGINT, &act, 0) != 0 ||
775 sigaction(SIGQUIT, &act, 0) != 0 ||
776 sigaction(SIGILL, &act, 0) != 0 ||
777 sigaction(SIGTRAP, &act, 0) != 0 ||
778 sigaction(SIGABRT, &act, 0) != 0 ||
780 sigaction(SIGEMT, &act, 0) != 0 ||
782 sigaction(SIGFPE, &act, 0) != 0 ||
783 sigaction(SIGBUS, &act, 0) != 0 ||
784 sigaction(SIGSEGV, &act, 0) != 0 ||
785 sigaction(SIGSYS, &act, 0) != 0 ||
786 sigaction(SIGPIPE, &act, 0) != 0 ||
787 sigaction(SIGALRM, &act, 0) != 0 ||
788 sigaction(SIGTERM, &act, 0) != 0) {
791 "Could not initialize signal handlers:\n"
792 " -> %s\n", strerror(errno));
804 if (setrlimit(RLIMIT_CORE, &rl) == -1) {
807 "Could not set core file size limit:\n"
808 " -> %s\n", strerror(errno));
833 static int _dsproc_init(
void)
835 const char *site =
_DSProc->site;
836 const char *facility =
_DSProc->facility;
837 const char *proc_name =
_DSProc->name;
838 const char *proc_type =
_DSProc->type;
839 time_t start_time =
_DSProc->start_time;
841 const char *site_desc;
843 char mail_subject[128];
853 snprintf(mail_from, 64,
"%s%s%s", site, proc_name, facility);
857 snprintf(mail_subject, 128,
858 "%s Error: %s%s.%s ", proc_type, site, facility, proc_name);
860 if (!_init_mail(
MSNGR_ERROR, mail_from, mail_subject,
"error_mail")) {
866 snprintf(mail_subject, 128,
867 "%s Warning: %s%s.%s ", proc_type, site, facility, proc_name);
869 if (!_init_mail(
MSNGR_WARNING, mail_from, mail_subject,
"warning_mail")) {
875 snprintf(mail_subject, 128,
876 "%s Message: %s%s.%s ", proc_type, site, facility, proc_name);
878 if (!_init_mail(
MSNGR_MAINTAINER, mail_from, mail_subject,
"mentor_mail")) {
889 if (msngr_debug_level || msngr_provenance_level) {
891 char time_string[32];
896 "Updating process start time in database: %s\n", time_string);
900 _DSProc->dsdb, site, facility, proc_type, proc_name, start_time);
907 "Could not update process start time in database\n"
908 " -> unexpected NULL result from database");
920 if (!_init_signal_handlers()) {
944 if (!_dsproc_set_standard_exclude_atts()) {
959 "Could not get process location from database\n"
960 " -> unexpected NULL result from database query\n");
979 "Could not get site description from database\n"
980 " -> unexpected NULL result from database query\n");
992 if (_MaxRunTime >= 0) {
993 _DSProc->max_run_time = _MaxRunTime;
999 _DSProc->max_run_time = (time_t)atoi(config_value);
1002 else if (status == 0) {
1003 _DSProc->max_run_time = (time_t)0;
1017 _DSProc->data_interval = (time_t)atoi(config_value);
1020 else if (status == 0) {
1021 _DSProc->data_interval = (time_t)0;
1034 _DSProc->min_valid_time = (time_t)atoi(config_value);
1037 else if (status == 0) {
1039 _DSProc->min_valid_time = (time_t)694224000;
1056 static int _read_next_begin_time_file(time_t *begin_time)
1059 const char *path = log->
path;
1060 const char *file =
".next_begin_time";
1061 char full_path[PATH_MAX];
1065 int year, mon, day, hour, min, sec;
1069 snprintf(full_path, PATH_MAX,
"%s/%s", path, file);
1071 if (access(full_path, F_OK) != 0) {
1073 if (errno != ENOENT) {
1076 "Could not access file: %s\n"
1077 " -> %s\n", full_path, strerror(errno));
1087 "Getting processing period begin time from file: %s\n",
1092 fp = fopen(full_path,
"r");
1096 "Could not open file: %s\n"
1098 full_path, strerror(errno));
1105 if (!fgets(timestamp, 64, fp)) {
1108 "Could not read file: %s\n"
1110 full_path, strerror(errno));
1121 count = sscanf(timestamp,
1122 "%4d%2d%2d.%2d%2d%2d",
1123 &year, &mon, &day, &hour, &min, &sec);
1128 "Invalid timestamp format '%s' in file: %s\n"
1129 " -> expected a string of the form YYYYMMDD.hhmmss'\n",
1130 timestamp, full_path);
1136 *begin_time =
get_secs1970(year, mon, day, hour, min, sec);
1148 static int _update_next_begin_time_file(time_t begin_time)
1151 const char *path = log->
path;
1152 const char *file =
".next_begin_time";
1153 char full_path[PATH_MAX];
1157 snprintf(full_path, PATH_MAX,
"%s/%s", path, file);
1159 if (msngr_debug_level || msngr_provenance_level) {
1164 "Updating next processing period begin time file:\n"
1167 full_path, timestamp);
1178 fp = fopen(full_path,
"w");
1182 "Could not open file: %s\n"
1184 full_path, strerror(errno));
1191 if (!fprintf(fp,
"%s", timestamp)) {
1194 "Could not write to file: %s\n"
1196 full_path, strerror(errno));
1216 static void _check_for_obs_loop(
void)
1221 for (dsid = 0; dsid <
_DSProc->ndatastreams; dsid++) {
1223 ds =
_DSProc->datastreams[dsid];
1244 int _set_next_obs_loop_interval(time_t search_start)
1254 timeval_t search_begin = { search_start, 0 };
1260 const char *next_obs = NULL;
1265 "Searching for next observation after: %s\n",
1268 for (dsid = 0; dsid <
_DSProc->ndatastreams; dsid++) {
1270 ds =
_DSProc->datastreams[dsid];
1276 status = _dsproc_find_next_dsfile(ds->dir, &search_begin, &dsfile);
1288 file_begin = dsfile->timevals[0];
1289 file_end = dsfile->timevals[dsfile->ntimes - 1];
1291 if (!begin.tv_sec ||
TV_LT(file_begin, begin)) {
1294 next_obs = dsfile->name;
1296 else if (
TV_EQ(file_begin, begin)) {
1297 if (
TV_GT(file_end, end)) {
1299 next_obs = dsfile->name;
1304 if (!begin.tv_sec) {
1312 _DSProc->interval_begin = begin.tv_sec;
1313 _DSProc->interval_end = end.tv_sec + 1;
1316 " - found %s from %s to %s\n",
1332 static int _set_next_real_time_begin(
void)
1339 time_t now = time(NULL);
1354 status = _read_next_begin_time_file(&begin);
1370 if (ndsids < 0)
return(-1);
1372 for (i = 0; i < ndsids; ++i) {
1376 search.tv_sec = now;
1379 dsid, NULL, &search, &ntimes, &found)) {
1389 if (end == 0 || found.tv_sec < end) {
1399 "Getting processing period begin time from earliest output datastream end time\n");
1405 while (begin < end) begin +=
_DSProc->proc_interval;
1415 "Getting processing period begin time from earliest available input data\n");
1420 if (ndsids < 0)
return(-1);
1422 for (i = 0; i < ndsids; ++i) {
1429 dsid, &search, NULL, &ntimes, &found)) {
1439 if (begin == 0 || found.tv_sec < begin) {
1440 begin = found.tv_sec;
1449 "No data was found for any input datastreams.\n");
1459 memset(&gmt, 0,
sizeof(
struct tm));
1460 gmtime_r(&begin, &gmt);
1462 if (
_DSProc->proc_interval != 3600) {
1472 if (msngr_debug_level || msngr_provenance_level) {
1477 "Processing period begin time: %s\n",
1481 _DSProc->period_begin = begin;
1494 static int _set_next_real_time_end(
void)
1502 time_t now = time(NULL);
1518 "Determining the processing period end time\n"
1519 " - using %d hours for maximum input data wait time",
1520 (
int)(_MaxRealTimeWait/3600 + 0.5));
1528 if (ndsids < 0)
return(-1);
1530 for (i = 0; i < ndsids; ++i) {
1533 ds =
_DSProc->datastreams[dsid];
1535 search.tv_sec = now;
1538 dsid, NULL, &search, &ntimes, &found)) {
1550 if (ds->ret_cache) {
1551 found.tv_sec -= ds->ret_cache->end_offset;
1558 max_end < found.tv_sec) {
1559 max_end = found.tv_sec;
1565 delta_t = now - found.tv_sec;
1567 if (delta_t < _MaxRealTimeWait) {
1570 end > found.tv_sec) {
1583 "No new data was found for any input datastreams.\n");
1592 _DSProc->period_end_max = end;
1598 begin =
_DSProc->period_begin;
1601 count = (int)((end - begin) /
_DSProc->proc_interval);
1602 end = begin + (count *
_DSProc->proc_interval);
1611 "Missing input data for one or more datastreams.\n"
1612 " -> waiting for input data or the maximum wait time of %d hours is reached",
1613 (
int)(_MaxRealTimeWait/3600 + 0.5));
1619 if (msngr_debug_level || msngr_provenance_level) {
1624 "Processing period end time: %s\n",
1640 void _dsproc_destroy(
void)
1646 _LogsRoot = (
char *)NULL;
1651 _LogsDir = (
char *)NULL;
1656 _LogFile = (
char *)NULL;
1661 _LogID = (
char *)NULL;
1671 "Freeing internal memory\n");
1683 _dsproc_free_retriever();
1708 for (i = 0; i <
_DSProc->ndatastreams; i++) {
1709 _dsproc_free_datastream(
_DSProc->datastreams[i]);
1720 _dsproc_free_exclude_atts();
1721 _dsproc_free_excluded_qc_vars();
1722 _dsproc_free_input_file_list();
1765 const char *format, ...)
1773 "dsproc_abort() called outside main processing framework\n");
1777 if (!func) func =
"null";
1778 if (!file) file =
"null";
1780 if (format || status) {
1782 if (!format) format = status;
1784 va_start(args, format);
1793 free((
void *)status);
1806 "Error message not set in call to dsproc_abbort()\n");
1812 _dsproc_run_finish_process_hook();
1833 "Enabling Asynchronous Processing Mode:\n"
1834 " - disabling the process lock file\n"
1835 " - disabling check for chronological data processing\n"
1836 " - disabling overlap checks with previously processed data\n"
1837 " - forcing a new file to be created for every output dataset\n");
1840 _AsynchronousMode = 1;
1858 "Setting disable process message: '%s'\n", message);
1860 strncpy((
char *)
_DSProc->disable, message, 511);
1865 "Setting status to: '%s'\n", message);
1868 strncpy((
char *)
_DSProc->status, message, 511);
1919 return(_AsynchronousMode);
1931 return(
_DSProc->data_interval);
1945 return(_DynamicDODs);
1977 if (_InputDir[0])
return((
const char *)_InputDir);
1978 return((
const char *)NULL);
1992 if (_InputFile[0])
return((
const char *)_InputFile);
1993 return((
const char *)NULL);
2007 if (_InputSource[0])
return((
const char *)_InputSource);
2008 return((
const char *)NULL);
2018 return(
_DSProc->max_run_time);
2028 return(
_DSProc->min_valid_time);
2041 if (begin) *begin =
_DSProc->interval_begin;
2042 if (end) *end =
_DSProc->interval_end;
2043 return(
_DSProc->proc_interval);
2057 return(_RealTimeMode);
2071 return(_Reprocessing);
2108 if (
_DSProc->max_run_time == (time_t)0) {
2112 remaining =
_DSProc->start_time
2116 if (remaining <= 0) {
2118 exceeded = abs((
int)remaining);
2121 "Exceeded max run time of %d seconds by %d seconds\n",
2122 (
int)
_DSProc->max_run_time, (
int)exceeded);
2129 "Processing time remaining: %d seconds\n", (
int)remaining);
2157 "Checking for fatal system error:\n"
2158 " - dsproc_status: '%s'\n"
2159 " - errno: %d = '%s'\n",
2160 status, last_errno, strerror(last_errno));
2162 const char *fatal[] = {
2244 for (fi = 0; fatal[fi]; ++fi) {
2245 if (strcmp(status, fatal[fi]) == 0) {
2248 " - dsproc_status indicates a fatal system error\n");
2256 switch (last_errno) {
2351 case EPROTONOSUPPORT:
2352 case ESOCKTNOSUPPORT:
2392 case ENOTRECOVERABLE:
2397 " - last errno indicates a fatal system error\n");
2405 " - not a fatal system error\n");
2424 "Setting dynamic DODs mode to: %d\n", mode);
2426 _DynamicDODs = mode;
2443 "Setting force mode to: %d\n", mode);
2461 "Setting input directory: %s\n", input_dir);
2463 strncpy(_InputDir, input_dir, PATH_MAX);
2479 "Setting input source: %s/%s\n", _InputDir, input_file);
2481 strncpy(_InputFile, input_file, PATH_MAX);
2482 snprintf(_InputSource, PATH_MAX,
"%s/%s", _InputDir, input_file);
2496 _LogsDir = strdup(log_dir);
2501 "Could not set log file directory: %s\n"
2502 " -> memory allocation error\n",
2523 _LogFile = strdup(log_file);
2528 "Could not set log file name: %s\n"
2529 " -> memory allocation error\n",
2550 _LogID = strdup(log_id);
2555 "Could not set log file ID: %s\n"
2556 " -> memory allocation error\n",
2580 _LogInterval = interval;
2581 _LogDataTime = use_begin_time;
2595 "Setting maximum runtime to: %d\n", max_runtime);
2597 _MaxRunTime = max_runtime;
2614 char ts1[32], ts2[32];
2616 _DSProc->interval_begin = begin_time;
2617 _DSProc->interval_end = end_time;
2618 _DSProc->proc_interval = end_time - begin_time;
2621 "Setting processing interval:\n"
2622 " - begin time: %s\n"
2624 " - interval: %d seconds\n",
2642 "Setting processing interval offset to: %d seconds\n", (
int)offset);
2644 _DSProc->interval_offset = offset;
2661 "Setting reprocessing mode to: %d\n", mode);
2663 _Reprocessing = mode;
2681 "Setting real time mode to: %d\n"
2682 " -> max wait time = %g hours\n",
2685 _RealTimeMode = mode;
2687 if (max_wait >= 0) {
2688 _MaxRealTimeWait = (int)(max_wait * 3600.0);
2727 const char *proc_version,
2729 const char **proc_names)
2731 const char *program_name = argv[0];
2732 time_t start_time = time(NULL);
2734 const char *facility;
2735 const char *proc_name;
2736 const char *proc_type;
2751 _DSProc = (DSProc *)calloc(1,
sizeof(DSProc));
2755 "%s: Memory allocation error initializing process\n",
2761 _DSProc->start_time = start_time;
2767 if (!(
_DSProc->version = strdup(proc_version))) {
2770 _dsproc_trim_version((
char *)
_DSProc->version);
2773 if (!(
_DSProc->version = strdup(
"Unknown"))) {
2780 if (nproc_names == 1) {
2781 if (!(
_DSProc->name = strdup(proc_names[0]))) {
2792 if (!(
_DSProc->type = strdup(
"Ingest"))) {
2806 _dsproc_vap_parse_args(argc, argv, nproc_names, proc_names);
2809 _dsproc_ingest_parse_args(argc, argv, nproc_names, proc_names);
2813 if (!(
_DSProc->type = strdup(
"VAP"))) {
2816 _dsproc_vap_parse_args(argc, argv, nproc_names, proc_names);
2835 "INITIALIZING PROCESS: %s%s-%s-%s\n",
2836 site, facility, proc_name, proc_type);
2839 if (!(
_DSProc->db_alias = strdup(
"dsdb_data"))) {
2849 if (!_lock_process(site, facility, proc_name, proc_type)) {
2860 "Initializing database connection: %s\n",
_DSProc->db_alias);
2866 "%s%s-%s-%s: Could not initialize database connection\n",
2867 site, facility, proc_name, proc_type);
2875 if (db_attempts == 0) {
2878 "%s%s-%s-%s: Could not connect to database\n",
2879 site, facility, proc_name, proc_type);
2887 if (
_DSProc->dsdb->dbconn->db_host[0] !=
'\0') {
2889 " - db_host: %s\n",
_DSProc->dsdb->dbconn->db_host);
2892 if (
_DSProc->dsdb->dbconn->db_name[0] !=
'\0') {
2894 " - db_name: %s\n",
_DSProc->dsdb->dbconn->db_name);
2897 if (
_DSProc->dsdb->dbconn->db_user[0] !=
'\0') {
2899 " - db_user: %s\n",
_DSProc->dsdb->dbconn->db_user);
2903 if (
_DSProc->dsdb->dbconn->db_type == DB_WSPC) {
2906 " - using read-only web service connection\n"
2907 " - disabled database updates\n"
2908 " - disabled mail messages\n");
2919 _DSProc->dsdb, site, facility, proc_type, proc_name, &fam_proc);
2924 "%s%s-%s-%s: Process not found in database\n",
2925 site, facility, proc_name, proc_type);
2939 if (!_init_provenance_log(site, facility, proc_name, proc_type)) {
2945 "Initializing process: %s%s-%s-%s\n",
2946 site, facility, proc_name, proc_type);
2950 "Process version: %s\n",
_DSProc->version);
2953 "Library versions:\n"
2954 " - libdsproc3: %s\n"
2959 " - libarmutils: %s\n"
2960 " - libdbconn: %s\n"
2961 " - libmsngr: %s\n",
2964 trans_lib_version(),
2968 dbconn_lib_version(),
2976 "Created process lockfile:\n"
2984 "Using database connection:\n");
2986 if (
_DSProc->dsdb->dbconn->db_host[0] !=
'\0') {
2988 " - db_host: %s\n",
_DSProc->dsdb->dbconn->db_host);
2991 if (
_DSProc->dsdb->dbconn->db_name[0] !=
'\0') {
2993 " - db_name: %s\n",
_DSProc->dsdb->dbconn->db_name);
2996 if (
_DSProc->dsdb->dbconn->db_user[0] !=
'\0') {
2998 " - db_user: %s\n",
_DSProc->dsdb->dbconn->db_user);
3001 if (
_DSProc->dsdb->dbconn->db_type == DB_WSPC) {
3003 " - using read-only web service connection\n"
3004 " - disabled database updates\n"
3005 " - disabled mail messages\n");
3013 if (!_init_process_log(site, facility, proc_name, proc_type)) {
3020 if (db_attempts > 1) {
3023 "\nDB_ATTEMPTS: It took %d attempts to connect to the database.\n",
3032 if (!_dsproc_init()) {
3045 if (!_dsproc_init_input_datastreams()) {
3052 if (!_dsproc_init_output_datastreams()) {
3065 if (!_dsproc_init_output_datastreams()) {
3072 if (!_dsproc_init_retriever()) {
3081 "Could not find retriever definition in database\n");
3093 _DSProc->proc_interval = (time_t)atoi(config_value);
3096 if (
_DSProc->proc_interval <= 0) {
3100 else if (status < 0) {
3113 "Processing interval not defined or <= 0:\n"
3114 " - using interval between begin and end times specified on command line: %d seconds\n",
3119 _DSProc->proc_interval = 86400;
3122 "Processing interval not defined or <= 0:\n"
3123 " - using default value: %d seconds\n",
3130 if (
_DSProc->cmd_line_begin) {
3135 status = _set_next_real_time_begin();
3145 else if (
_DSProc->cmd_line_begin) {
3151 status = _set_next_real_time_end();
3164 "%s: Memory allocation error initializing process\n",
3186 time_t *interval_begin,
3187 time_t *interval_end)
3189 time_t time_remaining;
3190 char begin_string[32];
3191 char end_string[32];
3193 time_t next_begin_time;
3194 time_t last_begin_time;
3195 char ts1[32], ts2[32];
3197 *interval_begin = 0;
3202 if (
_DSProc->interval_begin == 0) {
3204 _check_for_obs_loop();
3214 if (
_DSProc->period_end_max) {
3223 "Missing input data for one or more datastreams.\n"
3224 " -> waiting for input data or the maximum wait time of %d hours is reached",
3225 (
int)(_MaxRealTimeWait/3600 + 0.5));
3233 next_begin_time =
_DSProc->period_begin;
3236 next_begin_time =
_DSProc->interval_end;
3239 if (!
_DSProc->cmd_line_begin) {
3244 if (!_update_next_begin_time_file(next_begin_time)) {
3253 status = _read_next_begin_time_file(&last_begin_time);
3254 if (status < 0)
return(0);
3255 if (status > 0 && next_begin_time > last_begin_time) {
3257 if (!_update_next_begin_time_file(next_begin_time)) {
3267 status = _set_next_obs_loop_interval(next_begin_time);
3270 if (status == 0 && (next_begin_time ==
_DSProc->period_begin)) {
3275 "\nNo data found after: %s\n",
3286 if (next_begin_time ==
_DSProc->period_begin) {
3292 "\nNo data found from '%s' to '%s'\n",
3302 _DSProc->interval_begin = next_begin_time;
3320 *interval_begin =
_DSProc->interval_begin;
3321 *interval_end =
_DSProc->interval_end;
3325 if (
_DSProc->loop_begin != 0) {
3326 _DSProc->loop_end = time(NULL);
3331 if (time_remaining >= 0) {
3333 if (time_remaining == 0) {
3336 else if ((
_DSProc->loop_end -
_DSProc->loop_begin) > time_remaining) {
3339 "\nStopping vap before max run time of %d seconds is exceeded\n",
3347 _DSProc->loop_begin = time(NULL);
3355 "PROCESSING DATA:\n"
3358 begin_string, end_string);
3361 "\nProcessing data: %s -> %s\n",
3362 begin_string, end_string);
3392 char *last_status_text = (
char *)NULL;
3393 time_t last_successful = 0;
3394 time_t last_completed = 0;
3395 int no_data_found = 0;
3397 int total_in_records = 0;
3398 int total_records = 0;
3399 int total_files = 0;
3400 int last_errno = errno;
3402 const char *status_name;
3403 const char *status_text;
3404 char *status_message;
3405 char status_note[128];
3408 char finish_time_string[32];
3409 int mail_error_status;
3413 char time_string1[32];
3414 char time_string2[32];
3420 "EXITING PROCESS\n");
3426 for (dsi = 0; dsi <
_DSProc->ndatastreams; dsi++) {
3428 ds =
_DSProc->datastreams[dsi];
3431 ds->total_records &&
3436 if (ds->end_time.tv_sec) {
3440 strcpy(time_string2,
"none");
3445 "Datastream Stats: %s\n"
3446 " - begin time: %s\n"
3448 " - total records: %d\n",
3449 ds->name, time_string1, time_string2,
3452 total_in_records += ds->total_records;
3458 ds->begin_time.tv_sec) {
3462 if (ds->end_time.tv_sec) {
3466 strcpy(time_string2,
"none");
3471 "Datastream Stats: %s\n"
3472 " - begin time: %s\n"
3473 " - end time: %s\n",
3474 ds->name, time_string1, time_string2);
3476 if (ds->total_files) {
3479 " - total files: %d\n"
3480 " - total bytes: %lu\n",
3481 ds->total_files, (
unsigned long)ds->total_bytes);
3483 total_files += ds->total_files;
3486 if (ds->total_records) {
3489 " - total records: %d\n",
3492 total_records += ds->total_records;
3503 status_text =
_DSProc->status;
3504 if (status_text[0] ==
'\0') {
3506 if (total_files || total_records) {
3509 else if (
_DSProc->retriever && !total_in_records) {
3518 status_name =
"Success";
3522 status_name =
"NoDataFound";
3527 status_name =
"NoDataFound";
3532 status_name =
"MaxRuntimeExceeded";
3536 status_name =
"Failure";
3540 status_note[0] =
'\0';
3546 finish_time = time(NULL);
3553 "Updating process status in database\n");
3557 if (
_DSProc->disable[0] !=
'\0') {
3560 "Disabling Process: %s\n",
_DSProc->disable);
3562 finish_time = time(NULL);
3566 "AutoDisabled",
_DSProc->disable, finish_time);
3576 last_status_text = proc_status->
text;
3589 if (no_data_found) {
3591 delta_t =
_DSProc->start_time - last_successful;
3593 if (
_DSProc->data_interval > delta_t) {
3595 status_name =
"Success";
3600 snprintf(status_note, 128,
3601 " -> No input data was found but we are within\n"
3602 " -> the data expectation interval of %g hours.\n",
3603 (
double)
_DSProc->data_interval/3600);
3607 finish_time = time(NULL);
3617 status_name, status_text, finish_time);
3622 _dsproc_store_output_datastream_times();
3631 "Could not update process status in database:\n"
3632 " -> database connect error\n");
3634 snprintf(status_note, 128,
3635 " -> Could not update status in database\n");
3645 hostname =
"unknown";
3651 "Current Status (%s):\n"
3652 "Process: %s%s-%s-%s\n"
3659 _DSProc->version, hostname, status_text, status_note);
3661 if (!status_message) {
3664 "Could not create status message\n"
3665 " -> memory allocation error\n");
3675 "Adding process status to mail messages\n");
3679 mail_error_status = 1;
3682 mail_error_status = 0;
3684 else if (last_status_text) {
3685 if (strcmp(last_status_text, status_text) == 0) {
3686 mail_error_status = 0;
3690 _finish_mail(
MSNGR_ERROR, mail_error_status, status_message,
3691 last_status_text, last_completed, last_successful, finish_time_string);
3696 last_status_text, last_completed, last_successful, finish_time_string);
3701 last_status_text, last_completed, last_successful, finish_time_string);
3709 "Adding process status to log file\n");
3711 if (status_message) {
3714 "\n%s", status_message);
3716 free(status_message);
3757 "Suggested exit value: %d (successful)\n", exit_value);
3761 "Suggested exit value: %d (failure)\n", exit_value);
3998 "Setting status to: '%s'\n", status);
4000 strncpy((
char *)
_DSProc->status, status, 511);
4004 "Clearing last status string\n");
4006 strcpy((
char *)
_DSProc->status,
"");