root/dmtcp_coordinator.cpp

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. readProcessInfo
  2. getNewVirtualPid
  3. handleUserCommand
  4. printStatus
  5. updateMinimumState
  6. onData
  7. removeStaleSharedAreaFile
  8. preExitCleanup
  9. onDisconnect
  10. initializeComputation
  11. onConnect
  12. processDmtUserCmd
  13. validateRestartingWorkerProcess
  14. validateNewWorkerProcess
  15. startCheckpoint
  16. broadcastMessage
  17. getStatus
  18. writeRestartScript
  19. signalHandler
  20. setupSignalHandlers
  21. calcLocalAddr
  22. resetCkptTimer
  23. updateCheckpointInterval
  24. eventLoop
  25. addDataSocket
  26. main

   1 /****************************************************************************
   2  *   Copyright (C) 2006-2013 by Jason Ansel, Kapil Arya, and Gene Cooperman *
   3  *   jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu              *
   4  *                                                                          *
   5  *  This file is part of DMTCP.                                             *
   6  *                                                                          *
   7  *  DMTCP is free software: you can redistribute it and/or                  *
   8  *  modify it under the terms of the GNU Lesser General Public License as   *
   9  *  published by the Free Software Foundation, either version 3 of the      *
  10  *  License, or (at your option) any later version.                         *
  11  *                                                                          *
  12  *  DMTCP is distributed in the hope that it will be useful,                *
  13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
  14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
  15  *  GNU Lesser General Public License for more details.                     *
  16  *                                                                          *
  17  *  You should have received a copy of the GNU Lesser General Public        *
  18  *  License along with DMTCP:dmtcp/src.  If not, see                        *
  19  *  <http://www.gnu.org/licenses/>.                                         *
  20  ****************************************************************************/
  21 
  22 /****************************************************************************
  23  * Coordinator code logic:                                                  *
  24  * main calls eventLoop, a top-level event loop.                            *
  25  * eventLoop calls:  onConnect, onData, onDisconnect, startCheckpoint       *
  26  *   when client or dmtcp_command talks to coordinator.                     *
  27  * onConnect called on msg at listener port.  It passes control to:         *
  28  *   handleUserCommand, which takes single char arg ('s', 'c', 'k', 'q', ...)*
  29  * handleUserCommand calls broadcastMessage to send data back               *
  30  * any message sent by broadcastMessage takes effect only on returning      *
  31  *   back up to top level monitorSockets                                    *
  32  * Hence, even for checkpoint, handleUserCommand just changes state,        *
  33  *   broadcasts an initial checkpoint command, and then returns to top      *
  34  *   level.  Replies from clients then drive further state changes.         *
  35  * The prefix command 'b' (blocking) from dmtcp_command modifies behavior   *
  36  *   of 'c' so that the reply to dmtcp_command happens only when clients    *
  37  *   are back in RUNNING state.                                             *
  38  * onData called when a message arrives at a client's port.  It either      *
  39  *   processes a per-client special request, or continues the protocol      *
  40  *   for a checkpoint or restart sequence (see below).                      *
  41  *                                                                          *
  42  * updateMinimumState() is responsible for keeping track of states.         *
  43  * The coordinator keeps a ComputationStatus, with minimumState and         *
  44  *   maximumState for states of all workers, accessed through getStatus()   *
  45  *   or through minimumState()                                              *
  46  * The states for a worker (client) are:                                    *
  47  * Checkpoint: RUNNING -> SUSPENDED -> FD_LEADER_ELECTION -> DRAINED        *
  48  *                -> CHECKPOINTED -> NAME_SERVICE_DATA_REGISTERED           *
  49  *                -> DONE_QUERYING -> REFILLED -> RUNNING                   *
  50  * Restart:    RESTARTING -> CHECKPOINTED -> NAME_SERVICE_DATA_REGISTERED   *
  51  *                -> DONE_QUERYING -> REFILLED -> RUNNING                   *
  52  * If debugging, set gdb breakpoint on:                                     *
  53  *   DmtcpCoordinator::onConnect                                            *
  54  *   DmtcpCoordinator::onData                                               *
  55  *   DmtcpCoordinator::handleUserCommand                                    *
  56  *   DmtcpCoordinator::broadcastMessage                                     *
  57  ****************************************************************************/
  58 
  59 #include <stdlib.h>
  60 #include "dmtcp_coordinator.h"
  61 #include "constants.h"
  62 #include "protectedfds.h"
  63 #include "dmtcpmessagetypes.h"
  64 #include "lookup_service.h"
  65 #include "syscallwrappers.h"
  66 #include "util.h"
  67 #include "../jalib/jconvert.h"
  68 #include "../jalib/jtimer.h"
  69 #include "../jalib/jfilesystem.h"
  70 #include <stdio.h>
  71 #include <unistd.h>
  72 #include <sys/stat.h>
  73 #include <algorithm>
  74 #include <iomanip>
  75 #include <sys/wait.h>
  76 #include <sys/types.h>
  77 #include <sys/stat.h>
  78 #include <netdb.h>
  79 #include <arpa/inet.h>
  80 #include <fcntl.h>
  81 #undef min
  82 #undef max
  83 
  84 #define BINARY_NAME "dmtcp_coordinator"
  85 
  86 using namespace dmtcp;
  87 
  88 static const char* theHelpMessage =
  89   "COMMANDS:\n"
  90   "  l : List connected nodes\n"
  91   "  s : Print status message\n"
  92   "  c : Checkpoint all nodes\n"
  93   "  i : Print current checkpoint interval\n"
  94   "      (To change checkpoint interval, use dmtcp_command)\n"
  95   "  k : Kill all nodes\n"
  96   "  q : Kill all nodes and quit\n"
  97   "  ? : Show this message\n"
  98   "\n"
  99 ;
 100 
 101 static const char* theUsage =
 102   "Usage: dmtcp_coordinator [OPTIONS] [port]\n"
 103   "Coordinates checkpoints between multiple processes.\n\n"
 104   "Options:\n"
 105   "  -p, --coord-port PORT_NUM (environment variable DMTCP_COORD_PORT)\n"
 106   "      Port to listen on (default: 7779)\n"
 107   "  --port-file filename\n"
 108   "      File to write listener port number.\n"
 109   "      (Useful with '--port 0', which is used to assign a random port)\n"
 110   "  --ckptdir (environment variable DMTCP_CHECKPOINT_DIR):\n"
 111   "      Directory to store dmtcp_restart_script.sh (default: ./)\n"
 112   "  --tmpdir (environment variable DMTCP_TMPDIR):\n"
 113   "      Directory to store temporary files (default: env var TMDPIR or /tmp)\n"
 114   "  --exit-on-last\n"
 115   "      Exit automatically when last client disconnects\n"
 116   "  --exit-after-ckpt\n"
 117   "      Exit automatically after checkpoint is created\n"
 118   "  --daemon\n"
 119   "      Run silently in the background after detaching from the parent process.\n"
 120   "  -i, --interval (environment variable DMTCP_CHECKPOINT_INTERVAL):\n"
 121   "      Time in seconds between automatic checkpoints\n"
 122   "      (default: 0, disabled)\n"
 123   "  --help:\n"
 124   "      Print this message and exit.\n"
 125   "  --version:\n"
 126   "      Print version information and exit.\n"
 127   "\n"
 128   "COMMANDS:\n"
 129   "      type '?<return>' at runtime for list\n"
 130   "\n"
 131   HELP_AND_CONTACT_INFO
 132   "\n"
 133 ;
 134 
 135 
 136 static const char* theRestartScriptHeader =
 137   "#!/bin/bash\n\n"
 138   "set -m # turn on job control\n\n"
 139   "#This script launches all the restarts in the background.\n"
 140   "#Suggestions for editing:\n"
 141   "#  1. For those processes executing on the localhost, remove\n"
 142   "#     'ssh <hostname> from the start of the line.\n"
 143   "#  2. If using ssh, verify that ssh does not require passwords or other\n"
 144   "#     prompts.\n"
 145   "#  3. Verify that the dmtcp_restart command is in your path on all hosts,\n"
 146   "#     otherwise set the dmt_rstr_cmd appropriately.\n"
 147   "#  4. Verify DMTCP_COORD_HOST and DMTCP_COORD_PORT match the location of\n"
 148   "#     the dmtcp_coordinator. If necessary, add\n"
 149   "#     'DMTCP_COORD_PORT=<dmtcp_coordinator port>' after\n"
 150   "#     'DMTCP_COORD_HOST=<...>'.\n"
 151   "#  5. Remove the '&' from a line if that process reads STDIN.\n"
 152   "#     If multiple processes read STDIN then prefix the line with\n"
 153   "#     'xterm -hold -e' and put '&' at the end of the line.\n"
 154   "#  6. Processes on same host can be restarted with single dmtcp_restart\n"
 155   "#     command.\n\n\n"
 156 ;
 157 
 158 
 159 static const char* theRestartScriptCheckLocal =
 160   "check_local()\n"
 161   "{\n"
 162   "  worker_host=$1\n"
 163   "  unset is_local_node\n"
 164   "  worker_ip=$(gethostip -d $worker_host 2> /dev/null)\n"
 165   "  if [ -z \"$worker_ip\" ]; then\n"
 166   "    worker_ip=$(nslookup $worker_host | grep -A1 'Name:' | grep 'Address:' | sed -e 's/Address://' -e 's/ //' -e 's/ //')\n"
 167   "  fi\n"
 168   "  if [ -z \"$worker_ip\" ]; then\n"
 169   "    worker_ip=$(getent ahosts $worker_host |grep \"^[0-9]\\+\\.[0-9]\\+\\.[0-9]\\+\\.[0-9]\\+ *STREAM\" | cut -d' ' -f1)\n"
 170   "  fi\n"
 171   "  if [ -z \"$worker_ip\" ]; then\n"
 172   "    echo Could not find ip-address for $worker_host. Exiting...\n"
 173   "    exit 1\n"
 174   "  fi\n"
 175   "  ifconfig_path=$(which ifconfig)\n"
 176   "  if [ -z \"$ifconfig_path\" ]; then\n"
 177   "    ifconfig_path=\"/sbin/ifconfig\"\n"
 178   "  fi\n"
 179   "  output=$($ifconfig_path -a | grep \"inet addr:.*${worker_ip} .*Bcast\")\n"
 180   "  if [ -n \"$output\" ]; then\n"
 181   "    is_local_node=1\n"
 182   "  else\n"
 183   "    is_local_node=0\n"
 184   "  fi\n"
 185   "}\n\n\n";
 186 
 187 static const char *slurmHelperContactFunction =
 188 "pass_slurm_helper_contact()\n"
 189 "{\n"
 190 "  LOCAL_FILES=\"$1\"\n"
 191 "  # Create temp directory if needed\n"
 192 "  if [ -n \"$DMTCP_TMPDIR\" ]; then\n"
 193 "    CURRENT_TMPDIR=$DMTCP_TMPDIR/dmtcp-`whoami`@`hostname`\n"
 194 "  elif [ -n \"$TMPDIR\" ]; then\n"
 195 "    CURRENT_TMPDIR=$TMPDIR/dmtcp-`whoami`@`hostname`\n"
 196 "  else\n"
 197 "    CURRENT_TMPDIR=/tmp/dmtcp-`whoami`@`hostname`\n"
 198 "  fi\n"
 199 "  if [ ! -d \"$CURRENT_TMPDIR\" ]; then\n"
 200 "    mkdir -p $CURRENT_TMPDIR\n"
 201 "  fi\n"
 202 "  # Create files with SLURM environment\n"
 203 "  for CKPT_FILE in $LOCAL_FILES; do\n"
 204 "    SUFFIX=${CKPT_FILE%%.dmtcp}\n"
 205 "    SLURM_ENV_FILE=$CURRENT_TMPDIR/slurm_env_${SUFFIX##*_}\n"
 206 "    echo \"DMTCP_SRUN_HELPER_ADDR=$DMTCP_SRUN_HELPER_ADDR\" >> $SLURM_ENV_FILE\n"
 207 "  done\n"
 208 "}\n\n\n";
 209 
 210 static const char* theRestartScriptUsage =
 211   "usage_str='USAGE:\n"
 212   "  dmtcp_restart_script.sh [OPTIONS]\n\n"
 213   "OPTIONS:\n"
 214   "  --coord-host, -h, (environment variable DMTCP_COORD_HOST):\n"
 215   "      Hostname where dmtcp_coordinator is running\n"
 216   "  --coord-port, -p, (environment variable DMTCP_COORD_PORT):\n"
 217   "      Port where dmtcp_coordinator is running\n"
 218   "  --hostfile <arg0> :\n"
 219   "      Provide a hostfile (One host per line, \"#\" indicates comments)\n"
 220   "  --ckptdir, -d, (environment variable DMTCP_CHECKPOINT_DIR):\n"
 221   "      Directory to store checkpoint images\n"
 222   "      (default: use the same directory used in previous checkpoint)\n"
 223   "  --restartdir, -d, (environment variable DMTCP_RESTART_DIR):\n"
 224   "      Directory to read checkpoint images from\n"
 225   "  --tmpdir, -t, (environment variable DMTCP_TMPDIR):\n"
 226   "      Directory to store temporary files (default: $TMDPIR or /tmp)\n"
 227   "  --no-strict-uid-checking:\n"
 228   "      Disable uid checking for the checkpoint image.  This allows the\n"
 229   "        checkpoint image to be restarted by a different user than the one\n"
 230   "        that create it. (environment variable DMTCP_DISABLE_UID_CHECKING)\n"
 231   "  --interval, -i, (environment variable DMTCP_CHECKPOINT_INTERVAL):\n"
 232   "      Time in seconds between automatic checkpoints\n"
 233   "      (Default: Use pre-checkpoint value)\n"
 234   "  --help:\n"
 235   "      Print this message and exit.\'\n"
 236   "\n\n"
 237 ;
 238 
 239 static const char* theRestartScriptCmdlineArgHandler =
 240   "if [ $# -gt 0 ]; then\n"
 241   "  while [ $# -gt 0 ]\n"
 242   "  do\n"
 243   "    if [ $1 = \"--help\" ]; then\n"
 244   "      echo \"$usage_str\"\n"
 245   "      exit\n"
 246   "    elif [ $# -ge 1 ]; then\n"
 247   "      case \"$1\" in\n"
 248   "        --coord-host|--host|-h)\n"
 249   "          coord_host=\"$2\"\n"
 250   "          shift; shift;;\n"
 251   "        --coord-port|--port|-p)\n"
 252   "          coord_port=\"$2\"\n"
 253   "          shift; shift;;\n"
 254   "        --hostfile)\n"
 255   "          hostfile=\"$2\"\n"
 256   "          if [ ! -f \"$hostfile\" ]; then\n"
 257   "            echo \"ERROR: hostfile $hostfile not found\"\n"
 258   "            exit\n"
 259   "          fi\n"
 260   "          shift; shift;;\n"
 261   "        --restartdir|-d)\n"
 262   "          DMTCP_RESTART_DIR=$2\n"
 263   "          shift; shift;;\n"
 264   "        --ckptdir|-d)\n"
 265   "          DMTCP_CKPT_DIR=$2\n"
 266   "          shift; shift;;\n"
 267   "        --tmpdir|-t)\n"
 268   "          DMTCP_TMPDIR=$2\n"
 269   "          shift; shift;;\n"
 270   "        --no-strict-uid-checking)\n"
 271   "          noStrictUidChecking=\"--no-strict-uid-checking\"\n"
 272   "          shift;;\n"
 273   "        --interval|-i)\n"
 274   "          checkpoint_interval=$2\n"
 275   "          shift; shift;;\n"
 276   "        *)\n"
 277   "          echo \"$0: unrecognized option \'$1\'. See correct usage below\"\n"
 278   "          echo \"$usage_str\"\n"
 279   "          exit;;\n"
 280   "      esac\n"
 281   "    elif [ $1 = \"--help\" ]; then\n"
 282   "      echo \"$usage_str\"\n"
 283   "      exit\n"
 284   "    else\n"
 285   "      echo \"$0: Incorrect usage.  See correct usage below\"\n"
 286   "      echo\n"
 287   "      echo \"$usage_str\"\n"
 288   "      exit\n"
 289   "    fi\n"
 290   "  done\n"
 291   "fi\n\n"
 292 ;
 293 
 294 static const char* theRestartScriptSingleHostProcessing =
 295   "ckpt_files=\"\"\n"
 296   "if [ ! -z \"$DMTCP_RESTART_DIR\" ]; then\n"
 297   "  for tmp in $given_ckpt_files; do\n"
 298   "    ckpt_files=\"$DMTCP_RESTART_DIR/$(basename $tmp) $ckpt_files\"\n"
 299   "  done\n"
 300   "else\n"
 301   "  ckpt_files=$given_ckpt_files\n"
 302   "fi\n\n"
 303 
 304   "coordinator_info=\"--coord-host $coord_host --coord-port $coord_port\"\n"
 305 
 306   "tmpdir=\n"
 307   "if [ ! -z \"$DMTCP_TMPDIR\" ]; then\n"
 308   "  tmpdir=\"--tmpdir $DMTCP_TMPDIR\"\n"
 309   "fi\n\n"
 310 
 311   "ckpt_dir=\n"
 312   "if [ ! -z \"$DMTCP_CKPT_DIR\" ]; then\n"
 313   "  ckpt_dir=\"--ckptdir $DMTCP_CKPT_DIR\"\n"
 314   "fi\n\n"
 315 
 316   "exec $dmt_rstr_cmd $coordinator_info $ckpt_dir \\\n"
 317   "  $maybejoin --interval \"$checkpoint_interval\" $tmpdir $noStrictUidChecking \\\n"
 318   "  $ckpt_files\n"
 319 ;
 320 
 321 static const char* theRestartScriptMultiHostProcessing =
 322   "worker_ckpts_regexp=\\\n"
 323   "\'[^:]*::[ \\t\\n]*\\([^ \\t\\n]\\+\\)[ \\t\\n]*:\\([a-z]\\+\\):[ \\t\\n]*\\([^:]\\+\\)\'\n\n"
 324 
 325   "worker_hosts=$(\\\n"
 326   "  echo $worker_ckpts | sed -e \'s/\'\"$worker_ckpts_regexp\"\'/\\1 /g\')\n"
 327   "restart_modes=$(\\\n"
 328   "  echo $worker_ckpts | sed -e \'s/\'\"$worker_ckpts_regexp\"\'/: \\2/g\')\n"
 329   "ckpt_files_groups=$(\\\n"
 330   "  echo $worker_ckpts | sed -e \'s/\'\"$worker_ckpts_regexp\"\'/: \\3/g\')\n"
 331   "\n"
 332   "if [ ! -z \"$hostfile\" ]; then\n"
 333   "  worker_hosts=$(\\\n"
 334   "    cat \"$hostfile\" | sed -e \'s/#.*//\' -e \'s/[ \\t\\r]*//\' -e \'/^$/ d\')\n"
 335   "fi\n\n"
 336 
 337   "localhost_ckpt_files_group=\n\n"
 338 
 339   "num_worker_hosts=$(echo $worker_hosts | wc -w)\n\n"
 340 
 341   "maybejoin=\n"
 342   "if [ \"$num_worker_hosts\" != \"1\" ]; then\n"
 343   "  maybejoin='--join'\n"
 344   "fi\n\n"
 345 
 346   "for worker_host in $worker_hosts\n"
 347   "do\n\n"
 348   "  ckpt_files_group=$(\\\n"
 349   "    echo $ckpt_files_groups | sed -e \'s/[^:]*:[ \\t\\n]*\\([^:]*\\).*/\\1/\')\n"
 350   "  ckpt_files_groups=$(echo $ckpt_files_groups | sed -e \'s/[^:]*:[^:]*//\')\n"
 351   "\n"
 352   "  mode=$(echo $restart_modes | sed -e \'s/[^:]*:[ \\t\\n]*\\([^:]*\\).*/\\1/\')\n"
 353   "  restart_modes=$(echo $restart_modes | sed -e \'s/[^:]*:[^:]*//\')\n\n"
 354   "  maybexterm=\n"
 355   "  maybebg=\n"
 356   "  case $mode in\n"
 357   "    bg) maybebg=\'bg\';;\n"
 358   "    xterm) maybexterm=xterm;;\n"
 359   "    fg) ;;\n"
 360   "    *) echo \"WARNING: Unknown Mode\";;\n"
 361   "  esac\n\n"
 362   "  if [ -z \"$ckpt_files_group\" ]; then\n"
 363   "    break;\n"
 364   "  fi\n\n"
 365 
 366   "  new_ckpt_files_group=\"\"\n"
 367   "  for tmp in $ckpt_files_group\n"
 368   "  do\n"
 369   "      if  [ ! -z \"$DMTCP_RESTART_DIR\" ]; then\n"
 370   "        tmp=$DMTCP_RESTART_DIR/$(basename $tmp)\n"
 371   "      fi\n"
 372   "      new_ckpt_files_group=\"$new_ckpt_files_group $tmp\"\n"
 373   "  done\n\n"
 374 
 375   "tmpdir=\n"
 376   "if [ ! -z \"$DMTCP_TMPDIR\" ]; then\n"
 377   "  tmpdir=\"--tmpdir $DMTCP_TMPDIR\"\n"
 378   "fi\n\n"
 379 
 380   "  check_local $worker_host\n"
 381   "  if [ \"$is_local_node\" -eq 1 -o \"$num_worker_hosts\" == \"1\" ]; then\n"
 382   "    localhost_ckpt_files_group=\"$new_ckpt_files_group\"\n"
 383   "    continue\n"
 384   "  fi\n"
 385 
 386   "  if [ -z $maybebg ]; then\n"
 387   "    $maybexterm /usr/bin/ssh -t \"$worker_host\" \\\n"
 388   "      $dmt_rstr_cmd --coord-host \"$coord_host\""
 389                                              " --cord-port \"$coord_port\"\\\n"
 390   "      $ckpt_dir --join --interval \"$checkpoint_interval\" $tmpdir \\\n"
 391   "      $new_ckpt_files_group\n"
 392   "  else\n"
 393   "    $maybexterm /usr/bin/ssh \"$worker_host\" \\\n"
 394   // In Open MPI 1.4, without this (sh -c ...), orterun hangs at the
 395   // end of the computation until user presses enter key.
 396   "      \"/bin/sh -c \'$dmt_rstr_cmd --coord-host $coord_host"
 397                                                 " --coord-port $coord_port\\\n"
 398   "      $ckpt_dir --join --interval \"$checkpoint_interval\" $tmpdir \\\n"
 399   "      $new_ckpt_files_group\'\" &\n"
 400   "  fi\n\n"
 401   "done\n\n"
 402   "if [ -n \"$localhost_ckpt_files_group\" ]; then\n"
 403   "exec $dmt_rstr_cmd --coord-host \"$coord_host\""
 404                                            " --coord-port \"$coord_port\" \\\n"
 405   "  $ckpt_dir $maybejoin --interval \"$checkpoint_interval\" $tmpdir $noStrictUidChecking $localhost_ckpt_files_group\n"
 406   "fi\n\n"
 407 
 408   "#wait for them all to finish\n"
 409   "wait\n"
 410 ;
 411 
 412 static int thePort = -1;
 413 static string thePortFile;
 414 
 415 static bool exitOnLast = false;
 416 static bool blockUntilDone = false;
 417 static bool exitAfterCkpt = false;
 418 static int blockUntilDoneRemote = -1;
 419 
 420 static DmtcpCoordinator prog;
 421 
 422 /* The coordinator can receive a second checkpoint request while processing the
 423  * first one.  If the second request comes at a point where the coordinator has
 424  * broadcast DMT_DO_SUSPEND message but the workers haven't replied, the
 425  * coordinator sends another DMT_DO_SUSPEND message.  The workers, having
 426  * replied to the first DMTCP_DO_SUSPEND message (by suspending all the user
 427  * threads), are waiting for the next message (DMT_DO_FD_LEADER_ELECTION or
 428  * DMT_KILL_PEER), however they receive DMT_DO_SUSPEND message and thus exit()
 429  * indicating an error.
 430  * The fix to this problem is to introduce a global
 431  * variable "workersRunningAndSuspendMsgSent" which, as the name implies,
 432  * indicates that the DMT_DO_SUSPEND message has been sent and the coordinator
 433  * is waiting for replies from the workers.  If this variable is set, the
 434  * coordinator will not process another checkpoint request.
 435 */
 436 static bool workersRunningAndSuspendMsgSent = false;
 437 
 438 static bool killInProgress = false;
 439 static bool uniqueCkptFilenames = false;
 440 
 441 /* If dmtcp_launch/dmtcp_restart specifies '-i', theCheckpointInterval
 442  * will be reset accordingly (valid for current computation).  If dmtcp_command
 443  * specifies '-i' (or if user interactively invokes 'i' in coordinator),
 444  * then both theCheckpointInterval and theDefaultCheckpointInterval are set.
 445  * A value of '0' means:  never checkpoint (manual checkpoint only).
 446  */
 447 static uint32_t theCheckpointInterval = 0; /* Current checkpoint interval */
 448 static uint32_t theDefaultCheckpointInterval = 0; /* Reset to this on new comp. */
 449 static bool isRestarting = false;
 450 static bool timerExpired = false;
 451 
 452 static void resetCkptTimer();
 453 
 454 const int STDIN_FD = fileno ( stdin );
 455 
 456 JTIMER ( checkpoint );
 457 JTIMER ( restart );
 458 
 459 static UniquePid compId;
 460 static int numPeers = -1;
 461 static time_t curTimeStamp = -1;
 462 static time_t ckptTimeStamp = -1;
 463 
 464 static LookupService lookupService;
 465 
 466 static string coordHostname;
 467 static struct in_addr localhostIPAddr;
 468 
 469 static string tmpDir;
 470 static string ckptDir;
 471 
 472 #define MAX_EVENTS 10000
 473 struct epoll_event events[MAX_EVENTS];
 474 int epollFd;
 475 static jalib::JSocket *listenSock = NULL;
 476 
 477 static void removeStaleSharedAreaFile();
 478 static void preExitCleanup();
 479 
 480 static pid_t _nextVirtualPid = INITIAL_VIRTUAL_PID;
 481 
 482 static int theNextClientNumber = 1;
 483 vector<CoordClient*> clients;
 484 
 485 CoordClient::CoordClient(const jalib::JSocket& sock,
 486                          const struct sockaddr_storage *addr,
 487                          socklen_t len,
 488                          DmtcpMessage &hello_remote,
 489                          int isNSWorker)
 490   : _sock(sock)
 491 {
 492   _isNSWorker = isNSWorker;
 493   _realPid = hello_remote.realPid;
 494   _clientNumber = theNextClientNumber++;
 495   _identity = hello_remote.from;
 496   _state = hello_remote.state;
 497   struct sockaddr_in *in = (struct sockaddr_in*) addr;
 498   _ip = inet_ntoa(in->sin_addr);
 499 }
 500 
 501 void CoordClient::readProcessInfo(DmtcpMessage& msg)
 502 {
 503   if (msg.extraBytes > 0) {
 504     char* extraData = new char[msg.extraBytes];
 505     _sock.readAll(extraData, msg.extraBytes);
 506     _hostname = extraData;
 507     _progname = extraData + _hostname.length() + 1;
 508     delete [] extraData;
 509   }
 510 }
 511 
 512 pid_t DmtcpCoordinator::getNewVirtualPid()
 513 {
 514   pid_t pid = -1;
 515   JASSERT(_virtualPidToClientMap.size() < MAX_VIRTUAL_PID/1000)
 516     .Text("Exceeded maximum number of processes allowed");
 517   while (1) {
 518     pid = _nextVirtualPid;
 519     _nextVirtualPid += 1000;
 520     if (_nextVirtualPid > MAX_VIRTUAL_PID) {
 521       _nextVirtualPid = INITIAL_VIRTUAL_PID;
 522     }
 523     if (_virtualPidToClientMap.find(pid) == _virtualPidToClientMap.end()) {
 524       break;
 525     }
 526   }
 527   JASSERT(pid != -1) .Text("Not Reachable");
 528   return pid;
 529 }
 530 
 531 void DmtcpCoordinator::handleUserCommand(char cmd, DmtcpMessage* reply /*= NULL*/)
 532 {
 533   if (reply != NULL) reply->coordCmdStatus = CoordCmdStatus::NOERROR;
 534 
 535   switch ( cmd ){
 536   case 'b': case 'B':  // prefix blocking command, prior to checkpoint command
 537     JTRACE ( "blocking checkpoint beginning..." );
 538     blockUntilDone = true;
 539     break;
 540   case 'x': case 'X':  // prefix exit command, prior to checkpoint command
 541     JTRACE ( "Will exit after creating the checkpoint..." );
 542     exitAfterCkpt = true;
 543     break;
 544   case 'c': case 'C':
 545     JTRACE ( "checkpointing..." );
 546     if(startCheckpoint()){
 547       if (reply != NULL) reply->numPeers = getStatus().numPeers;
 548     }else{
 549       if (reply != NULL) reply->coordCmdStatus = CoordCmdStatus::ERROR_NOT_RUNNING_STATE;
 550     }
 551     break;
 552   case 'i': case 'I':
 553     JTRACE("setting checkpoint interval...");
 554     updateCheckpointInterval ( theCheckpointInterval );
 555     if (theCheckpointInterval == 0)
 556       printf("Current Checkpoint Interval:"
 557              " Disabled (checkpoint manually instead)\n");
 558     else
 559       printf("Current Checkpoint Interval: %d\n", theCheckpointInterval);
 560     if (theDefaultCheckpointInterval == 0)
 561       printf("Default Checkpoint Interval:"
 562              " Disabled (checkpoint manually instead)\n");
 563     else
 564       printf("Default Checkpoint Interval: %d\n", theDefaultCheckpointInterval);
 565     break;
 566   case 'l': case 'L':
 567   case 't': case 'T':
 568     JASSERT_STDERR << "Client List:\n";
 569     JASSERT_STDERR << "#, PROG[virtPID:realPID]@HOST, DMTCP-UNIQUEPID, STATE\n";
 570     for (size_t i = 0; i < clients.size(); i++) {
 571       JASSERT_STDERR << clients[i]->clientNumber()
 572         << ", " << clients[i]->progname()
 573         << "[" << clients[i]->identity().pid() << ":" << clients[i]->realPid()
 574         << "]@" << clients[i]->hostname()
 575 #ifdef PRINT_REMOTE_IP
 576         << "(" << clients[i]->ip() << ")"
 577 #endif
 578         << ", " << clients[i]->identity()
 579         << ", " << clients[i]->state().toString()
 580         << '\n';
 581     }
 582     break;
 583   case 'q': case 'Q':
 584   {
 585     JNOTE ( "killing all connected peers and quitting ..." );
 586     broadcastMessage ( DMT_KILL_PEER );
 587     JASSERT_STDERR << "DMTCP coordinator exiting... (per request)\n";
 588     for (size_t i = 0; i < clients.size(); i++) {
 589       clients[i]->sock().close();
 590     }
 591     listenSock->close();
 592     preExitCleanup();
 593     JTRACE ("Exiting ...");
 594     exit ( 0 );
 595     break;
 596   }
 597   case 'k': case 'K':
 598     JNOTE ( "Killing all connected Peers..." );
 599     //FIXME: What happens if a 'k' command is followed by a 'c' command before
 600     //       the *real* broadcast takes place?         --Kapil
 601     broadcastMessage ( DMT_KILL_PEER );
 602     break;
 603   case 'h': case 'H': case '?':
 604     JASSERT_STDERR << theHelpMessage;
 605     break;
 606   case 's': case 'S':
 607   {
 608     ComputationStatus s = getStatus();
 609     bool running = (s.minimumStateUnanimous &&
 610                     s.minimumState==WorkerState::RUNNING);
 611     if (reply != NULL) {
 612       reply->numPeers = s.numPeers;
 613       reply->isRunning = running;
 614       reply->theCheckpointInterval = theCheckpointInterval;
 615     } else {
 616       printStatus(s.numPeers, running);
 617     }
 618     break;
 619   }
 620   case ' ': case '\t': case '\n': case '\r':
 621     //ignore whitespace
 622     break;
 623   default:
 624     JTRACE("unhandled user command")(cmd);
 625     if (reply != NULL){
 626       reply->coordCmdStatus = CoordCmdStatus::ERROR_INVALID_COMMAND;
 627     }
 628   }
 629   return;
 630 }
 631 
 632 void DmtcpCoordinator::printStatus(size_t numPeers, bool isRunning)
 633 {
 634   ostringstream o;
 635   o << "Status..." << std::endl
 636     << "Host: " << coordHostname
 637     << " (" << inet_ntoa(localhostIPAddr) << ")" << std::endl
 638     << "Port: " << thePort << std::endl
 639     << "Checkpoint Interval: ";
 640 
 641   if (theCheckpointInterval == 0) {
 642     o << "disabled (checkpoint manually instead)" << std::endl;
 643   } else {
 644     o << theCheckpointInterval << std::endl;
 645   }
 646 
 647   o << "Exit on last client: " << exitOnLast << std::endl
 648     << "Exit after checkpoint: " << exitAfterCkpt << std::endl
 649     << "Computation Id: " << compId << std::endl
 650     << "Checkpoint Dir: " << ckptDir << std::endl
 651     << "NUM_PEERS=" << numPeers << std::endl
 652     << "RUNNING=" << (isRunning ? "yes" : "no") << std::endl;
 653   printf("%s", o.str().c_str());
 654   fflush(stdout);
 655 }
 656 
 657 void DmtcpCoordinator::updateMinimumState(WorkerState oldState)
 658 {
 659   WorkerState newState = minimumState();
 660 
 661   if ( oldState == WorkerState::RUNNING
 662        && newState == WorkerState::SUSPENDED )
 663   {
 664     JNOTE ( "locking all nodes" );
 665     broadcastMessage(DMT_DO_FD_LEADER_ELECTION, getStatus().numPeers );
 666   }
 667   if ( oldState == WorkerState::SUSPENDED
 668        && newState == WorkerState::FD_LEADER_ELECTION )
 669   {
 670     JNOTE ( "draining all nodes" );
 671     broadcastMessage ( DMT_DO_DRAIN );
 672   }
 673   if ( oldState == WorkerState::FD_LEADER_ELECTION
 674        && newState == WorkerState::DRAINED )
 675   {
 676     JNOTE ( "checkpointing all nodes" );
 677     broadcastMessage ( DMT_DO_CHECKPOINT );
 678   }
 679 
 680 #ifdef COORD_NAMESERVICE
 681   if ( oldState == WorkerState::DRAINED
 682        && newState == WorkerState::CHECKPOINTED )
 683   {
 684     writeRestartScript();
 685     if (exitAfterCkpt) {
 686       JNOTE("Checkpoint Done. Killing all peers.");
 687       broadcastMessage(DMT_KILL_PEER);
 688       exitAfterCkpt = false;
 689     } else {
 690       JNOTE ( "building name service database" );
 691       lookupService.reset();
 692       broadcastMessage ( DMT_DO_REGISTER_NAME_SERVICE_DATA );
 693     }
 694   }
 695   if ( oldState == WorkerState::RESTARTING
 696        && newState == WorkerState::CHECKPOINTED )
 697   {
 698     JTIMER_STOP ( restart );
 699 
 700     lookupService.reset();
 701     JNOTE ( "building name service database (after restart)" );
 702     broadcastMessage ( DMT_DO_REGISTER_NAME_SERVICE_DATA );
 703   }
 704   if ( oldState == WorkerState::CHECKPOINTED
 705        && newState == WorkerState::NAME_SERVICE_DATA_REGISTERED ){
 706     JNOTE ( "entertaining queries now" );
 707     broadcastMessage ( DMT_DO_SEND_QUERIES );
 708   }
 709   if ( oldState == WorkerState::NAME_SERVICE_DATA_REGISTERED
 710        && newState == WorkerState::DONE_QUERYING ){
 711     JNOTE ( "refilling all nodes" );
 712     broadcastMessage ( DMT_DO_REFILL );
 713   }
 714   if ( oldState == WorkerState::DONE_QUERYING
 715        && newState == WorkerState::REFILLED )
 716     /* then broadcastMessage(DMT_DO_RESUME) after the #endif, below */
 717 #else
 718   if ( oldState == WorkerState::DRAINED
 719        && newState == WorkerState::CHECKPOINTED )
 720   {
 721     writeRestartScript();
 722     if (exitAfterCkpt) {
 723       JNOTE("Checkpoint Done. Killing all peers.");
 724       broadcastMessage(DMT_KILL_PEER);
 725       exitAfterCkpt = false;
 726     } else {
 727       JNOTE ( "refilling all nodes" );
 728       broadcastMessage ( DMT_DO_REFILL );
 729     }
 730   }
 731   if ( oldState == WorkerState::RESTARTING
 732        && newState == WorkerState::CHECKPOINTED )
 733   {
 734     JTIMER_STOP ( restart );
 735 
 736     JNOTE ( "refilling all nodes (after checkpoint)" );
 737     broadcastMessage ( DMT_DO_REFILL );
 738   }
 739   if ( oldState == WorkerState::CHECKPOINTED
 740        && newState == WorkerState::REFILLED )
 741 #endif
 742   {
 743     JNOTE ( "restarting all nodes" );
 744     broadcastMessage ( DMT_DO_RESUME );
 745 
 746     JTIMER_STOP ( checkpoint );
 747     isRestarting = false;
 748 
 749     resetCkptTimer();
 750 
 751     if (blockUntilDone) {
 752       DmtcpMessage blockUntilDoneReply(DMT_USER_CMD_RESULT);
 753       JNOTE ( "replying to dmtcp_command:  we're done" );
 754       // These were set in DmtcpCoordinator::onConnect in this file
 755       jalib::JSocket remote ( blockUntilDoneRemote );
 756       remote << blockUntilDoneReply;
 757       remote.close();
 758       blockUntilDone = false;
 759       blockUntilDoneRemote = -1;
 760     }
 761   }
 762 }
 763 
 764 void DmtcpCoordinator::onData(CoordClient *client)
 765 {
 766   DmtcpMessage msg;
 767   JASSERT(client != NULL);
 768 
 769   client->sock() >> msg;
 770   msg.assertValid();
 771   char *extraData = 0;
 772   if (msg.extraBytes > 0) {
 773     extraData = new char[msg.extraBytes];
 774     client->sock().readAll(extraData, msg.extraBytes);
 775   }
 776 
 777   switch ( msg.type )
 778   {
 779     case DMT_OK:
 780     {
 781       WorkerState oldState = client->state();
 782       client->setState ( msg.state );
 783       ComputationStatus s = getStatus();
 784       WorkerState newState = s.minimumState;
 785 
 786       JTRACE ("got DMT_OK message")
 787         ( oldState )( msg.from )( msg.state )( newState );
 788 
 789       updateMinimumState(oldState);
 790       break;
 791     }
 792     case DMT_UNIQUE_CKPT_FILENAME:
 793       uniqueCkptFilenames = true;
 794       // Fall though
 795     case DMT_CKPT_FILENAME:
 796     {
 797       JASSERT ( extraData!=0 )
 798         .Text ( "extra data expected with DMT_CKPT_FILENAME message" );
 799       string ckptFilename;
 800       string hostname;
 801       ckptFilename = extraData;
 802       hostname = extraData + ckptFilename.length() + 1;
 803 
 804       JTRACE ( "recording restart info" ) ( ckptFilename ) ( hostname );
 805       _restartFilenames[hostname].push_back ( ckptFilename );
 806     }
 807     break;
 808     case DMT_GET_CKPT_DIR:
 809     {
 810       DmtcpMessage reply(DMT_GET_CKPT_DIR_RESULT);
 811       reply.extraBytes = ckptDir.length() + 1;
 812       client->sock() << reply;
 813       client->sock().writeAll(ckptDir.c_str(), reply.extraBytes);
 814     }
 815     break;
 816     case DMT_UPDATE_CKPT_DIR:
 817     {
 818       JASSERT(extraData != 0)
 819         .Text("extra data expected with DMT_UPDATE_CKPT_DIR message");
 820       if (strcmp(ckptDir.c_str(), extraData) != 0) {
 821         ckptDir = extraData;
 822         JNOTE("Updated ckptDir") (ckptDir);
 823       }
 824     }
 825     break;
 826 
 827 #ifdef COORD_NAMESERVICE
 828     case DMT_REGISTER_NAME_SERVICE_DATA:
 829     {
 830       JTRACE ("received REGISTER_NAME_SERVICE_DATA msg") (client->identity());
 831       lookupService.registerData(msg, (const void*) extraData);
 832     }
 833     break;
 834 
 835     case DMT_REGISTER_NAME_SERVICE_DATA_SYNC:
 836     {
 837       JTRACE ("received REGISTER_NAME_SERVICE_DATA_SYNC msg") (client->identity());
 838       lookupService.registerData(msg, (const void*) extraData);
 839       DmtcpMessage response(DMT_REGISTER_NAME_SERVICE_DATA_SYNC_RESPONSE);
 840       JTRACE("Sending NS response to the client...");
 841       client->sock() << response;
 842     }
 843     break;
 844     case DMT_NAME_SERVICE_QUERY:
 845     {
 846       JTRACE ("received NAME_SERVICE_QUERY msg") (client->identity());
 847       lookupService.respondToQuery(client->sock(), msg,
 848                                    (const void*) extraData);
 849     }
 850     break;
 851 #endif
 852 
 853     case DMT_UPDATE_PROCESS_INFO_AFTER_FORK:
 854     {
 855         JNOTE("Updating process Information after fork()")
 856           (client->hostname()) (client->progname()) (msg.from) (client->identity());
 857         client->identity(msg.from);
 858         client->realPid(msg.realPid);
 859     }
 860     break;
 861     case DMT_UPDATE_PROCESS_INFO_AFTER_INIT_OR_EXEC:
 862     {
 863       string progname = extraData;
 864       JNOTE("Updating process Information after exec()")
 865         (progname) (msg.from) (client->identity());
 866       client->progname(progname);
 867       client->identity(msg.from);
 868     }
 869     break;
 870 
 871     case DMT_NULL:
 872       JWARNING(false) (msg.type) .Text("unexpected message from worker. Closing connection");
 873       onDisconnect(client);
 874       break;
 875     default:
 876       JASSERT ( false ) ( msg.from ) ( msg.type )
 877         .Text ( "unexpected message from worker" );
 878   }
 879 
 880   delete[] extraData;
 881 }
 882 
 883 
 884 static void removeStaleSharedAreaFile()
 885 {
 886   ostringstream o;
 887   o << tmpDir
 888     << "/dmtcpSharedArea." << compId << "." << std::hex << curTimeStamp;
 889   JTRACE("Removing sharedArea file.") (o.str());
 890   unlink(o.str().c_str());
 891 }
 892 
 893 static void preExitCleanup()
 894 {
 895   removeStaleSharedAreaFile();
 896   JTRACE("Removing port-file") (thePortFile);
 897   unlink(thePortFile.c_str());
 898 }
 899 
 900 void DmtcpCoordinator::onDisconnect(CoordClient *client)
 901 {
 902   if (client->isNSWorker()) {
 903     client->sock().close();
 904     delete client;
 905     return;
 906   }
 907   for (size_t i = 0; i < clients.size(); i++) {
 908     if (clients[i] == client) {
 909       clients.erase(clients.begin() + i);
 910       break;
 911     }
 912   }
 913   client->sock().close();
 914   JNOTE ( "client disconnected" ) ( client->identity() );
 915   _virtualPidToClientMap.erase(client->virtualPid());
 916 
 917   ComputationStatus s = getStatus();
 918   if (s.numPeers < 1) {
 919     if (exitOnLast) {
 920       JNOTE ("last client exited, shutting down..");
 921       handleUserCommand('q');
 922     } else {
 923       removeStaleSharedAreaFile();
 924     }
 925     // If a kill in is progress, the coordinator refuses any new connections,
 926     // thus we need to reset it to false once all the processes in the
 927     // computations have disconnected.
 928     killInProgress = false;
 929     if (theCheckpointInterval != theDefaultCheckpointInterval) {
 930       updateCheckpointInterval(theDefaultCheckpointInterval);
 931       JNOTE ( "CheckpointInterval reset on end of current computation" )
 932         ( theCheckpointInterval );
 933     }
 934   } else {
 935     updateMinimumState(client->state());
 936   }
 937 }
 938 
 939 void DmtcpCoordinator::initializeComputation()
 940 {
 941   //this is the first connection, do some initializations
 942   workersRunningAndSuspendMsgSent = false;
 943   killInProgress = false;
 944   //_nextVirtualPid = INITIAL_VIRTUAL_PID;
 945 
 946   // theCheckpointInterval can be overridden later by msg from this client.
 947   updateCheckpointInterval( theDefaultCheckpointInterval );
 948 
 949   // drop current computation group to 0
 950   compId = UniquePid(0,0,0);
 951   curTimeStamp = 0; // Drop timestamp to 0
 952   numPeers = -1; // Drop number of peers to unknown
 953   blockUntilDone = false;
 954   //exitAfterCkpt = false;
 955 }
 956 
 957 void DmtcpCoordinator::onConnect()
 958 {
 959   struct sockaddr_storage remoteAddr;
 960   socklen_t remoteLen = sizeof(remoteAddr);
 961   jalib::JSocket remote = listenSock->accept(&remoteAddr, &remoteLen);
 962   JTRACE("accepting new connection") (remote.sockfd()) (JASSERT_ERRNO);
 963 
 964   if (!remote.isValid()) {
 965     remote.close();
 966     return;
 967   }
 968 
 969   DmtcpMessage hello_remote;
 970   hello_remote.poison();
 971   JTRACE("Reading from incoming connection...");
 972   remote >> hello_remote;
 973   if (!remote.isValid()) {
 974     remote.close();
 975     return;
 976   }
 977 
 978 #ifdef COORD_NAMESERVICE
 979   if (hello_remote.type == DMT_NAME_SERVICE_WORKER) {
 980     CoordClient *client = new CoordClient(remote, &remoteAddr, remoteLen,
 981                                           hello_remote);
 982 
 983     addDataSocket(client);
 984     return;
 985   }
 986   if (hello_remote.type == DMT_NAME_SERVICE_QUERY) {
 987     JASSERT(hello_remote.extraBytes > 0) (hello_remote.extraBytes);
 988     char *extraData = new char[hello_remote.extraBytes];
 989     remote.readAll(extraData, hello_remote.extraBytes);
 990 
 991     JTRACE ("received NAME_SERVICE_QUERY msg on running") (hello_remote.from);
 992     lookupService.respondToQuery(remote, hello_remote, extraData);
 993     delete [] extraData;
 994     remote.close();
 995     return;
 996   }
 997   if (hello_remote.type == DMT_REGISTER_NAME_SERVICE_DATA) {
 998     JASSERT(hello_remote.extraBytes > 0) (hello_remote.extraBytes);
 999     char *extraData = new char[hello_remote.extraBytes];
1000     remote.readAll(extraData, hello_remote.extraBytes);
1001 
1002     JTRACE ("received REGISTER_NAME_SERVICE_DATA msg on running") (hello_remote.from);
1003     lookupService.registerData(hello_remote, (const void*) extraData);
1004     delete [] extraData;
1005     remote.close();
1006     return;
1007   }
1008   if (hello_remote.type == DMT_REGISTER_NAME_SERVICE_DATA_SYNC) {
1009     JASSERT(hello_remote.extraBytes > 0) (hello_remote.extraBytes);
1010     char *extraData = new char[hello_remote.extraBytes];
1011     remote.readAll(extraData, hello_remote.extraBytes);
1012 
1013     JTRACE ("received REGISTER_NAME_SERVICE_DATA msg on running") (hello_remote.from);
1014     lookupService.registerData(hello_remote, (const void*) extraData);
1015     delete [] extraData;
1016     DmtcpMessage response(DMT_REGISTER_NAME_SERVICE_DATA_SYNC_RESPONSE);
1017     JTRACE("Reading from incoming connection...");
1018     remote << response;
1019     remote.close();
1020     return;
1021   }
1022 #endif
1023 
1024   if (hello_remote.type == DMT_USER_CMD) {
1025     // TODO(kapil): Update ckpt interval only if a valid one was supplied to
1026     // dmtcp_command.
1027     updateCheckpointInterval(hello_remote.theCheckpointInterval);
1028     processDmtUserCmd(hello_remote, remote);
1029     return;
1030   }
1031 
1032   if (killInProgress) {
1033     JNOTE("Connection request received in the middle of killing computation. "
1034           "Sending it the kill message.");
1035     DmtcpMessage msg;
1036     msg.type = DMT_KILL_PEER;
1037     remote << msg;
1038     remote.close();
1039     return;
1040   }
1041 
1042   // If no client is connected to Coordinator, then there can be only zero data
1043   // sockets OR there can be one data socket and that should be STDIN.
1044   if (clients.size() == 0) {
1045     initializeComputation();
1046   }
1047 
1048   CoordClient *client = new CoordClient(remote, &remoteAddr, remoteLen,
1049                                         hello_remote);
1050 
1051   if( hello_remote.extraBytes > 0 ){
1052     client->readProcessInfo(hello_remote);
1053   }
1054 
1055   if (hello_remote.type == DMT_RESTART_WORKER) {
1056     if (!validateRestartingWorkerProcess(hello_remote, remote,
1057                                          &remoteAddr, remoteLen)) {
1058       return;
1059     }
1060     client->virtualPid(hello_remote.from.pid());
1061     _virtualPidToClientMap[client->virtualPid()] = client;
1062     isRestarting = true;
1063   } else if (hello_remote.type == DMT_NEW_WORKER) {
1064     JASSERT(hello_remote.state == WorkerState::RUNNING ||
1065             hello_remote.state == WorkerState::UNKNOWN);
1066     JASSERT(hello_remote.virtualPid == -1);
1067     client->virtualPid(getNewVirtualPid());
1068     if (!validateNewWorkerProcess(hello_remote, remote, client,
1069                                   &remoteAddr, remoteLen)) {
1070       return;
1071     }
1072     _virtualPidToClientMap[client->virtualPid()] = client;
1073   } else {
1074     JASSERT(false) (hello_remote.type)
1075       .Text("Connect request from Unknown Remote Process Type");
1076   }
1077 
1078   updateCheckpointInterval(hello_remote.theCheckpointInterval);
1079   JNOTE ( "worker connected" ) ( hello_remote.from );
1080 
1081   clients.push_back(client);
1082   addDataSocket(client);
1083 
1084   JTRACE("END") (clients.size());
1085 }
1086 
1087 void DmtcpCoordinator::processDmtUserCmd(DmtcpMessage& hello_remote,
1088                                          jalib::JSocket& remote )
1089 {
1090   //dmtcp_command doesn't handshake (it is antisocial)
1091   JTRACE("got user command from dmtcp_command")(hello_remote.coordCmd);
1092   DmtcpMessage reply;
1093   reply.type = DMT_USER_CMD_RESULT;
1094   // if previous 'b' blocking prefix command had set blockUntilDone
1095   if (blockUntilDone && blockUntilDoneRemote == -1  &&
1096       hello_remote.coordCmd == 'c') {
1097     // Reply will be done in DmtcpCoordinator::onData in this file.
1098     blockUntilDoneRemote = remote.sockfd();
1099     handleUserCommand( hello_remote.coordCmd, &reply );
1100   } else if ( (hello_remote.coordCmd == 'i')
1101                && hello_remote.theCheckpointInterval >= 0 ) {
1102 //    theDefaultCheckpointInterval = hello_remote.theCheckpointInterval;
1103 //    theCheckpointInterval = theDefaultCheckpointInterval;
1104     handleUserCommand( hello_remote.coordCmd, &reply );
1105     remote << reply;
1106     remote.close();
1107   } else {
1108     handleUserCommand( hello_remote.coordCmd, &reply );
1109     remote << reply;
1110     remote.close();
1111   }
1112   return;
1113 }
1114 
1115 bool DmtcpCoordinator::validateRestartingWorkerProcess(
1116     DmtcpMessage& hello_remote,
1117     jalib::JSocket& remote,
1118     const struct sockaddr_storage* remoteAddr,
1119     socklen_t remoteLen)
1120 {
1121   struct timeval tv;
1122   const struct sockaddr_in *sin = (const struct sockaddr_in*) remoteAddr;
1123   string remoteIP = inet_ntoa(sin->sin_addr);
1124   DmtcpMessage hello_local ( DMT_ACCEPT );
1125 
1126   JASSERT(hello_remote.state == WorkerState::RESTARTING) (hello_remote.state);
1127 
1128   if (compId == UniquePid(0,0,0)) {
1129     JASSERT ( minimumState() == WorkerState::UNKNOWN )
1130       .Text ( "Coordinator should be idle at this moment" );
1131     // Coordinator is free at this moment - set up all the things
1132     compId = hello_remote.compGroup;
1133     numPeers = hello_remote.numPeers;
1134     JASSERT(gettimeofday(&tv, NULL) == 0);
1135     // Get the resolution down to 100 mili seconds.
1136     curTimeStamp = (tv.tv_sec << 4) | (tv.tv_usec / (100*1000));
1137     JNOTE ( "FIRST dmtcp_restart connection.  Set numPeers. Generate timestamp" )
1138       ( numPeers ) ( curTimeStamp ) ( compId );
1139     JTIMER_START(restart);
1140   } else if (minimumState() != WorkerState::RESTARTING &&
1141              minimumState() != WorkerState::CHECKPOINTED) {
1142     JNOTE ("Computation not in RESTARTING or CHECKPOINTED state."
1143            "  Reject incoming computation process requesting restart.")
1144       (compId) (hello_remote.compGroup) (minimumState());
1145     hello_local.type = DMT_REJECT_NOT_RESTARTING;
1146     remote << hello_local;
1147     remote.close();
1148     return false;
1149   } else if ( hello_remote.compGroup != compId) {
1150     JNOTE ("Reject incoming computation process requesting restart,"
1151            " since it is not from current computation.")
1152       ( compId ) ( hello_remote.compGroup );
1153     hello_local.type = DMT_REJECT_WRONG_COMP;
1154     remote << hello_local;
1155     remote.close();
1156     return false;
1157   }
1158   // dmtcp_restart already connected and compGroup created.
1159   // Computation process connection
1160   JASSERT ( curTimeStamp != 0 );
1161 
1162   JTRACE("Connection from (restarting) computation process")
1163     ( compId ) ( hello_remote.compGroup ) ( minimumState() );
1164 
1165   hello_local.coordTimeStamp = curTimeStamp;
1166   if (Util::strStartsWith(remoteIP, "127.")) {
1167     memcpy(&hello_local.ipAddr, &localhostIPAddr, sizeof localhostIPAddr);
1168   } else {
1169     memcpy(&hello_local.ipAddr, &sin->sin_addr, sizeof localhostIPAddr);
1170   }
1171   remote << hello_local;
1172 
1173   // NOTE: Sending the same message twice. We want to make sure that the
1174   // worker process receives/processes the first messages as soon as it
1175   // connects to the coordinator. The second message will be processed in
1176   // postRestart routine in DmtcpWorker.
1177   //
1178   // The reason to do this is the following. The dmtcp_restart process
1179   // connects to the coordinator at a very early stage. Later on, before
1180   // exec()'ing into mtcp_restart, it reconnects to the coordinator using
1181   // it's original UniquiePid and closes the earlier socket connection.
1182   // However, the coordinator might process the disconnect() before it
1183   // processes the connect() which would lead to a situation where the
1184   // coordinator is not connected to any worker processes. The coordinator
1185   // would now process the connect() and may reject the worker because the
1186   // worker state is RESTARTING, but the minimumState() is UNKNOWN.
1187   //remote << hello_local;
1188 
1189   return true;
1190 }
1191 
1192 bool DmtcpCoordinator::validateNewWorkerProcess(
1193     DmtcpMessage& hello_remote,
1194     jalib::JSocket& remote,
1195     CoordClient *client,
1196     const struct sockaddr_storage* remoteAddr,
1197     socklen_t remoteLen)
1198 {
1199   const struct sockaddr_in *sin = (const struct sockaddr_in*) remoteAddr;
1200   string remoteIP = inet_ntoa(sin->sin_addr);
1201   DmtcpMessage hello_local(DMT_ACCEPT);
1202   hello_local.virtualPid = client->virtualPid();
1203   ComputationStatus s = getStatus();
1204 
1205   JASSERT(hello_remote.state == WorkerState::RUNNING ||
1206           hello_remote.state == WorkerState::UNKNOWN) (hello_remote.state);
1207 
1208   if (workersRunningAndSuspendMsgSent == true) {
1209     /* Worker trying to connect after SUSPEND message has been sent.
1210      * This happens if the worker process is executing a fork() system call
1211      * when the DMT_DO_SUSPEND is broadcast. We need to make sure that the
1212      * child process is allowed to participate in the current checkpoint.
1213      */
1214     JASSERT(s.numPeers > 0) (s.numPeers);
1215     JASSERT(s.minimumState != WorkerState::SUSPENDED) (s.minimumState);
1216 
1217     // Handshake
1218     hello_local.compGroup = compId;
1219     remote << hello_local;
1220 
1221     // Now send DMT_DO_SUSPEND message so that this process can also
1222     // participate in the current checkpoint
1223     DmtcpMessage suspendMsg (DMT_DO_SUSPEND);
1224     suspendMsg.compGroup = compId;
1225     remote << suspendMsg;
1226 
1227   } else if (s.numPeers > 0 && s.minimumState != WorkerState::RUNNING &&
1228              s.minimumState != WorkerState::UNKNOWN) {
1229     // If some of the processes are not in RUNNING state
1230     JNOTE("Current computation not in RUNNING state."
1231           "  Refusing to accept new connections.")
1232       (compId) (hello_remote.from)
1233       (s.numPeers) (s.minimumState);
1234     hello_local.type = DMT_REJECT_NOT_RUNNING;
1235     remote << hello_local;
1236     remote.close();
1237     return false;
1238 
1239   } else if (hello_remote.compGroup != UniquePid()) {
1240     // New Process trying to connect to Coordinator but already has compGroup
1241     JNOTE ("New process not part of currently running computation group"
1242            "on this coordinator.  Rejecting.")
1243       (hello_remote.compGroup);
1244 
1245     hello_local.type = DMT_REJECT_WRONG_COMP;
1246     remote << hello_local;
1247     remote.close();
1248     return false;
1249 
1250   } else {
1251     // If first process, create the new computation group
1252     if (compId == UniquePid(0,0,0)) {
1253       struct timeval tv;
1254       // Connection of new computation.
1255       compId = UniquePid(hello_remote.from.hostid(), client->virtualPid(),
1256                          hello_remote.from.time(),
1257                          hello_remote.from.computationGeneration());
1258 
1259       JASSERT(gettimeofday(&tv, NULL) == 0);
1260       // Get the resolution down to 100 mili seconds.
1261       curTimeStamp = (tv.tv_sec << 4) | (tv.tv_usec / (100*1000));
1262       numPeers = -1;
1263       JTRACE("First process connected.  Creating new computation group.")
1264         (compId);
1265     } else {
1266       JTRACE("New process connected")
1267         (hello_remote.from) (client->virtualPid());
1268     }
1269     hello_local.compGroup = compId;
1270     hello_local.coordTimeStamp = curTimeStamp;
1271     if (Util::strStartsWith(remoteIP, "127.")) {
1272       memcpy(&hello_local.ipAddr, &localhostIPAddr, sizeof localhostIPAddr);
1273     } else {
1274       memcpy(&hello_local.ipAddr, &sin->sin_addr, sizeof localhostIPAddr);
1275     }
1276     remote << hello_local;
1277   }
1278   return true;
1279 }
1280 
1281 bool DmtcpCoordinator::startCheckpoint()
1282 {
1283   uniqueCkptFilenames = false;
1284   ComputationStatus s = getStatus();
1285   if ( s.minimumState == WorkerState::RUNNING && s.minimumStateUnanimous
1286        && !workersRunningAndSuspendMsgSent )
1287   {
1288     time(&ckptTimeStamp);
1289     JTIMER_START ( checkpoint );
1290     _restartFilenames.clear();
1291     JNOTE ( "starting checkpoint, suspending all nodes" )( s.numPeers );
1292     compId.incrementGeneration();
1293     JNOTE("Incremented computationGeneration") (compId.computationGeneration());
1294     // Pass number of connected peers to all clients
1295     broadcastMessage(DMT_DO_SUSPEND);
1296 
1297     // Suspend Message has been sent but the workers are still in running
1298     // state.  If the coordinator receives another checkpoint request from user
1299     // at this point, it should fail.
1300     workersRunningAndSuspendMsgSent = true;
1301     return true;
1302   } else {
1303     if (s.numPeers > 0) {
1304       JTRACE ( "delaying checkpoint, workers not ready" ) ( s.minimumState )
1305              ( s.numPeers );
1306     }
1307     return false;
1308   }
1309 }
1310 
1311 void DmtcpCoordinator::broadcastMessage(DmtcpMessageType type, int numPeers)
1312 {
1313   DmtcpMessage msg;
1314   msg.type = type;
1315   msg.compGroup = compId;
1316   if (numPeers > 0) {
1317     msg.numPeers = numPeers;
1318   }
1319 
1320   if (msg.type == DMT_KILL_PEER && clients.size() > 0) {
1321     killInProgress = true;
1322   } else if (msg.type == DMT_DO_FD_LEADER_ELECTION) {
1323     // All the workers are in SUSPENDED state, now it is safe to reset
1324     // this flag.
1325     workersRunningAndSuspendMsgSent = false;
1326   }
1327 
1328   for (size_t i = 0; i < clients.size(); i++) {
1329     clients[i]->sock() << msg;
1330   }
1331   JTRACE ("sending message")( type );
1332 }
1333 
1334 DmtcpCoordinator::ComputationStatus DmtcpCoordinator::getStatus() const
1335 {
1336   ComputationStatus status;
1337   const static int INITIAL_MIN = WorkerState::_MAX;
1338   const static int INITIAL_MAX = WorkerState::UNKNOWN;
1339   int min = INITIAL_MIN;
1340   int max = INITIAL_MAX;
1341   int count = 0;
1342   bool unanimous = true;
1343   for (size_t i = 0; i < clients.size(); i++) {
1344     int cliState = clients[i]->state().value();
1345     count++;
1346     unanimous = unanimous && (min==cliState || min==INITIAL_MIN);
1347     if ( cliState < min ) min = cliState;
1348     if ( cliState > max ) max = cliState;
1349   }
1350 
1351   status.minimumState = ( min==INITIAL_MIN ? WorkerState::UNKNOWN
1352                           : (WorkerState::eWorkerState)min );
1353   if( status.minimumState == WorkerState::CHECKPOINTED &&
1354       isRestarting && count < numPeers ){
1355     JTRACE("minimal state counted as CHECKPOINTED but not all processes"
1356            " are connected yet.  So we wait.") ( numPeers ) ( count );
1357     status.minimumState = WorkerState::RESTARTING;
1358   }
1359   status.minimumStateUnanimous = unanimous;
1360 
1361   status.maximumState = ( max==INITIAL_MAX ? WorkerState::UNKNOWN
1362                           : (WorkerState::eWorkerState)max );
1363   status.numPeers = count;
1364   return status;
1365 }
1366 
1367 void DmtcpCoordinator::writeRestartScript()
1368 {
1369   ostringstream o;
1370   string uniqueFilename;
1371 
1372   o << string(ckptDir) << "/"
1373     << RESTART_SCRIPT_BASENAME << "_" << compId;
1374   if (uniqueCkptFilenames) {
1375     o << "_" << std::setw(5) << std::setfill('0') << compId.computationGeneration();
1376   }
1377   o << "." << RESTART_SCRIPT_EXT;
1378   uniqueFilename = o.str();
1379 
1380   const bool isSingleHost = (_restartFilenames.size() == 1);
1381 
1382   map< string, vector<string> >::const_iterator host;
1383   vector<string>::const_iterator file;
1384 
1385   char hostname[80];
1386   char timestamp[80];
1387   gethostname ( hostname, 80 );
1388 
1389   JTRACE ( "writing restart script" ) ( uniqueFilename );
1390 
1391   FILE* fp = fopen ( uniqueFilename.c_str(),"w" );
1392   JASSERT ( fp!=0 )(JASSERT_ERRNO)( uniqueFilename )
1393     .Text ( "failed to open file" );
1394 
1395   fprintf ( fp, "%s", theRestartScriptHeader );
1396   fprintf ( fp, "%s", theRestartScriptCheckLocal );
1397   fprintf ( fp, "%s", slurmHelperContactFunction );
1398   fprintf ( fp, "%s", theRestartScriptUsage );
1399 
1400   ctime_r(&ckptTimeStamp, timestamp);
1401   // Remove the trailing '\n'
1402   timestamp[strlen(timestamp) - 1] = '\0';
1403   fprintf ( fp, "ckpt_timestamp=\"%s\"\n\n", timestamp );
1404 
1405   fprintf ( fp, "coord_host=$" ENV_VAR_NAME_HOST "\n"
1406                 "if test -z \"$" ENV_VAR_NAME_HOST "\"; then\n"
1407                 "  coord_host=%s\nfi\n\n"
1408                 "coord_port=$" ENV_VAR_NAME_PORT "\n"
1409                 "if test -z \"$" ENV_VAR_NAME_PORT "\"; then\n"
1410                 "  coord_port=%d\nfi\n\n"
1411                 "checkpoint_interval=$" ENV_VAR_CKPT_INTR "\n"
1412                 "if test -z \"$" ENV_VAR_CKPT_INTR "\"; then\n"
1413                 "  checkpoint_interval=%d\nfi\n"
1414                 "export DMTCP_CHECKPOINT_INTERVAL=${checkpoint_interval}\n\n",
1415                 hostname, thePort, theCheckpointInterval );
1416 
1417   fprintf ( fp, "%s", theRestartScriptCmdlineArgHandler );
1418 
1419   fprintf ( fp, "dmt_rstr_cmd=%s/" DMTCP_RESTART_CMD "\n"
1420                 "which $dmt_rstr_cmd > /dev/null 2>&1"
1421                 " || dmt_rstr_cmd=" DMTCP_RESTART_CMD "\n"
1422                 "which $dmt_rstr_cmd > /dev/null 2>&1"
1423                 " || echo \"$0: $dmt_rstr_cmd not found\"\n"
1424                 "which $dmt_rstr_cmd > /dev/null 2>&1 || exit 1\n\n",
1425                 jalib::Filesystem::GetProgramDir().c_str());
1426 
1427   fprintf ( fp, "# Number of hosts in the computation = %zd\n"
1428                 "# Number of processes in the computation = %d\n\n",
1429                 _restartFilenames.size(), getStatus().numPeers );
1430 
1431   if ( isSingleHost ) {
1432     JTRACE ( "Single HOST" );
1433 
1434     host=_restartFilenames.begin();
1435     ostringstream o;
1436     for ( file=host->second.begin(); file!=host->second.end(); ++file ) {
1437       o << " " << *file;
1438     }
1439     fprintf ( fp, "given_ckpt_files=\"%s\"\n\n", o.str().c_str());
1440 
1441     fprintf ( fp, "%s", theRestartScriptSingleHostProcessing );
1442   }
1443   else
1444   {
1445     fprintf ( fp, "%s",
1446               "# SYNTAX:\n"
1447               "#  :: <HOST> :<MODE>: <CHECKPOINT_IMAGE> ...\n"
1448               "# Host names and filenames must not include \':\'\n"
1449               "# At most one fg (foreground) mode allowed; it must be last.\n"
1450               "# \'maybexterm\' and \'maybebg\' are set from <MODE>.\n");
1451 
1452     fprintf ( fp, "%s", "worker_ckpts=\'" );
1453     for ( host=_restartFilenames.begin(); host!=_restartFilenames.end(); ++host ) {
1454       fprintf ( fp, "\n :: %s :bg:", host->first.c_str() );
1455       for ( file=host->second.begin(); file!=host->second.end(); ++file ) {
1456         fprintf ( fp," %s", file->c_str() );
1457       }
1458     }
1459     fprintf ( fp, "%s", "\n\'\n\n" );
1460 
1461     fprintf( fp,  "# Check for resource manager\n"
1462                   "ibrun_path=$(which ibrun 2> /dev/null)\n"
1463                   "if [ ! -n \"$ibrun_path\" ]; then\n"
1464                   "  discover_rm_path=$(which dmtcp_discover_rm)\n"
1465                   "  if [ -n \"$discover_rm_path\" ]; then\n"
1466                   "    eval $(dmtcp_discover_rm -t)\n"
1467                   "    srun_path=$(which srun 2> /dev/null)\n"
1468                   "    llaunch=`which dmtcp_rm_loclaunch`\n"
1469                   "    if [ $RES_MANAGER = \"SLURM\" ] && [ -n \"$srun_path\" ]; then\n"
1470                   "      eval $(dmtcp_discover_rm -n \"$worker_ckpts\")\n"
1471                   "      if [ -n \"$DMTCP_DISCOVER_RM_ERROR\" ]; then\n"
1472                   "          echo \"Restart error: $DMTCP_DISCOVER_RM_ERROR\"\n"
1473                   "          echo \"Allocated resources: $manager_resources\"\n"
1474                   "          exit 0\n"
1475                   "      fi\n"
1476                   "      export DMTCP_REMLAUNCH_NODES=$DMTCP_REMLAUNCH_NODES\n"
1477                   "      bound=$(($DMTCP_REMLAUNCH_NODES - 1))\n"
1478                   "      for i in $(seq 0 $bound); do\n"
1479                   "        eval \"val=\\${DMTCP_REMLAUNCH_${i}_SLOTS}\"\n"
1480                   "        export DMTCP_REMLAUNCH_${i}_SLOTS=\"$val\"\n"
1481                   "        bound2=$(($val - 1))\n"
1482                   "        for j in $(seq 0 $bound2); do\n"
1483                   "          eval \"ckpts=\\${DMTCP_REMLAUNCH_${i}_${j}}\"\n"
1484                   "          export DMTCP_REMLAUNCH_${i}_${j}=\"$ckpts\"\n"
1485                   "        done\n"
1486                   "      done\n"
1487                   "      if [ \"$DMTCP_DISCOVER_PM_TYPE\" = \"HYDRA\" ]; then\n"
1488                   "        export DMTCP_SRUN_HELPER_SYNCFILE=`mktemp ./tmp.XXXXXXXXXX`\n"
1489                   "        rm $DMTCP_SRUN_HELPER_SYNCFILE\n"
1490                   "        dmtcp_srun_helper -r $srun_path \"$llaunch\"\n"
1491                   "        if [ ! -f $DMTCP_SRUN_HELPER_SYNCFILE ]; then\n"
1492                   "          echo \"Error launching application\"\n"
1493                   "          exit 1\n"
1494                   "        fi\n"
1495                   "        # export helper contact info\n"
1496                   "        . $DMTCP_SRUN_HELPER_SYNCFILE\n"
1497                   "        pass_slurm_helper_contact \"$DMTCP_LAUNCH_CKPTS\"\n"
1498                   "        rm $DMTCP_SRUN_HELPER_SYNCFILE\n"
1499                   "        dmtcp_restart --join --coord-host $DMTCP_COORD_HOST"
1500                               " --coord-port $DMTCP_COORD_PORT"
1501                               " $DMTCP_LAUNCH_CKPTS\n"
1502                   "      else\n"
1503                   "        DMTCP_REMLAUNCH_0_0=\"$DMTCP_REMLAUNCH_0_0"
1504                                                      " $DMTCP_LAUNCH_CKPTS\"\n"
1505                   "        $srun_path \"$llaunch\"\n"
1506                   "      fi\n"
1507                   "      exit 0\n"
1508                   "    elif [ $RES_MANAGER = \"TORQUE\" ]; then\n"
1509                   "      #eval $(dmtcp_discover_rm \"$worker_ckpts\")\n"
1510                   "      #if [ -n \"$new_worker_ckpts\" ]; then\n"
1511                   "      #  worker_ckpts=\"$new_worker_ckpts\"\n"
1512                   "      #fi\n"
1513                   "      eval $(dmtcp_discover_rm -n \"$worker_ckpts\")\n"
1514                   "      if [ -n \"$DMTCP_DISCOVER_RM_ERROR\" ]; then\n"
1515                   "          echo \"Restart error: $DMTCP_DISCOVER_RM_ERROR\"\n"
1516                   "          echo \"Allocated resources: $manager_resources\"\n"
1517                   "          exit 0\n"
1518                   "      fi\n"
1519                   "      arguments=\"PATH=$PATH DMTCP_COORD_HOST=$DMTCP_COORD_HOST"
1520                                       " DMTCP_COORD_PORT=$DMTCP_COORD_PORT\"\n"
1521                   "      arguments=$arguments\" DMTCP_CHECKPOINT_INTERVAL=$DMTCP_CHECKPOINT_INTERVAL\"\n"
1522                   "      arguments=$arguments\" DMTCP_TMPDIR=$DMTCP_TMPDIR\"\n"
1523                   "      arguments=$arguments\" DMTCP_REMLAUNCH_NODES=$DMTCP_REMLAUNCH_NODES\"\n"
1524                   "      bound=$(($DMTCP_REMLAUNCH_NODES - 1))\n"
1525                   "      for i in $(seq 0 $bound); do\n"
1526                   "        eval \"val=\\${DMTCP_REMLAUNCH_${i}_SLOTS}\"\n"
1527                   "        arguments=$arguments\" DMTCP_REMLAUNCH_${i}_SLOTS=\\\"$val\\\"\"\n"
1528                   "        bound2=$(($val - 1))\n"
1529                   "        for j in $(seq 0 $bound2); do\n"
1530                   "          eval \"ckpts=\\${DMTCP_REMLAUNCH_${i}_${j}}\"\n"
1531                   "          arguments=$arguments\" DMTCP_REMLAUNCH_${i}_${j}=\\\"$ckpts\\\"\"\n"
1532                   "        done\n"
1533                   "      done\n"
1534                   "      pbsdsh -u \"$llaunch\" \"$arguments\"\n"
1535                   "      exit 0\n"
1536                   "    fi\n"
1537                   "  fi\n"
1538                   "fi\n"
1539                   "\n\n"
1540              );
1541 
1542     fprintf ( fp, "%s", theRestartScriptMultiHostProcessing );
1543   }
1544 
1545   fclose ( fp );
1546   {
1547     string filename = RESTART_SCRIPT_BASENAME "." RESTART_SCRIPT_EXT;
1548     string dirname = jalib::Filesystem::DirName(uniqueFilename);
1549     int dirfd = open(dirname.c_str(), O_DIRECTORY | O_RDONLY);
1550     JASSERT(dirfd != -1) (dirname) (JASSERT_ERRNO);
1551 
1552     /* Set execute permission for user. */
1553     struct stat buf;
1554     JASSERT(stat(uniqueFilename.c_str(), &buf) == 0);
1555     JASSERT(chmod(uniqueFilename.c_str(), buf.st_mode | S_IXUSR) == 0);
1556     // Create a symlink from
1557     //   dmtcp_restart_script.sh -> dmtcp_restart_script_<curCompId>.sh
1558     unlink(filename.c_str());
1559     JTRACE("linking \"dmtcp_restart_script.sh\" filename to uniqueFilename")
1560       (filename) (dirname) (uniqueFilename);
1561     // FIXME:  Handle error case of symlink()
1562     JWARNING(symlinkat(uniqueFilename.c_str(), dirfd, filename.c_str()) == 0);
1563     JASSERT(close(dirfd) == 0);
1564   }
1565   _restartFilenames.clear();
1566 }
1567 
1568 static void signalHandler(int signum)
1569 {
1570   if (signum == SIGINT) {
1571     prog.handleUserCommand('q');
1572   } else if (signum == SIGALRM) {
1573     timerExpired = true;
1574   } else {
1575     JASSERT(false) .Text("Not reached");
1576   }
1577 }
1578 
1579 static void setupSignalHandlers()
1580 {
1581   struct sigaction action;
1582   sigemptyset(&action.sa_mask);
1583   action.sa_flags = 0;
1584   action.sa_handler = signalHandler;
1585 
1586   sigaction(SIGINT, &action, NULL);
1587   sigaction(SIGALRM, &action, NULL);
1588 }
1589 
1590 // This code is also copied to ssh.cpp:updateCoordHost()
1591 static void calcLocalAddr()
1592 {
1593   string cmd;
1594   char hostname[HOST_NAME_MAX];
1595   JASSERT(gethostname(hostname, sizeof hostname) == 0) (JASSERT_ERRNO);
1596   struct addrinfo *result;
1597   struct addrinfo *res;
1598   int error;
1599   struct addrinfo hints;
1600 
1601   memset(&localhostIPAddr, 0, sizeof localhostIPAddr);
1602   memset(&hints, 0, sizeof(struct addrinfo));
1603   hints.ai_family = AF_INET;
1604   hints.ai_socktype = SOCK_STREAM;
1605   hints.ai_flags = AI_PASSIVE;
1606   hints.ai_protocol = 0;
1607   hints.ai_canonname = NULL;
1608   hints.ai_addr = NULL;
1609   hints.ai_next = NULL;
1610 
1611   /* resolve the domain name into a list of addresses */
1612   error = getaddrinfo(hostname, NULL, &hints, &result);
1613   if (error == 0) {
1614     /* loop over all returned results and do inverse lookup */
1615     bool success = false;
1616     for (res = result; res != NULL; res = res->ai_next) {
1617       char name[NI_MAXHOST] = "";
1618       struct sockaddr_in *s = (struct sockaddr_in*) res->ai_addr;
1619 
1620       error = getnameinfo(res->ai_addr, res->ai_addrlen, name, NI_MAXHOST, NULL, 0, 0);
1621       if (error != 0) {
1622         JTRACE("getnameinfo() failed.") (gai_strerror(error));
1623         continue;
1624       }
1625       if (Util::strStartsWith(name, hostname) ||
1626           Util::strStartsWith(hostname, name)) {
1627         JASSERT(sizeof localhostIPAddr == sizeof s->sin_addr);
1628         success = true;
1629         memcpy(&localhostIPAddr, &s->sin_addr, sizeof s->sin_addr);
1630       }
1631     }
1632     if (!success) {
1633       JWARNING("Failed to find coordinator IP address.  DMTCP may fail.") (hostname) ;
1634     }
1635   } else {
1636     if (error == EAI_SYSTEM) {
1637       perror("getaddrinfo");
1638     } else {
1639       JTRACE("Error in getaddrinfo") (gai_strerror(error));
1640     }
1641     inet_aton("127.0.0.1", &localhostIPAddr);
1642   }
1643   coordHostname = hostname;
1644 }
1645 
1646 static void resetCkptTimer()
1647 {
1648   alarm(theCheckpointInterval);
1649 }
1650 
1651 void DmtcpCoordinator::updateCheckpointInterval(uint32_t interval)
1652 {
1653   if ( interval != DMTCPMESSAGE_SAME_CKPT_INTERVAL &&
1654       interval != theCheckpointInterval) {
1655     int oldInterval = theCheckpointInterval;
1656     theCheckpointInterval = interval;
1657     JNOTE ( "CheckpointInterval updated (for this computation only)" )
1658       ( oldInterval ) ( theCheckpointInterval );
1659     resetCkptTimer();
1660   }
1661 }
1662 
1663 void DmtcpCoordinator::eventLoop(bool daemon)
1664 {
1665   struct epoll_event ev;
1666   epollFd = epoll_create(MAX_EVENTS);
1667   JASSERT(epollFd != -1) (JASSERT_ERRNO);
1668 
1669   ev.events = EPOLLIN;
1670   ev.data.ptr = listenSock;
1671   JASSERT(epoll_ctl(epollFd, EPOLL_CTL_ADD, listenSock->sockfd(), &ev) != -1)
1672     (JASSERT_ERRNO);
1673 
1674   if (!daemon &&
1675       // epoll_ctl below fails if STDIN is pointing to /dev/null.
1676       // Not sure why.
1677       jalib::Filesystem::GetDeviceName(0) != "/dev/null" &&
1678       jalib::Filesystem::GetDeviceName(0) != "/dev/zero" &&
1679       jalib::Filesystem::GetDeviceName(0) != "/dev/random") {
1680     ev.events = EPOLLIN;
1681 #ifdef EPOLLRDHUP
1682     ev.events |= EPOLLRDHUP;
1683 #endif
1684     ev.data.ptr = (void*) STDIN_FILENO;
1685     JASSERT(epoll_ctl(epollFd, EPOLL_CTL_ADD, STDIN_FILENO, &ev) != -1)
1686       (JASSERT_ERRNO);
1687   }
1688 
1689   while (true) {
1690     // Wait until either there is some activity on client sockets, or the timer
1691     // has expired.
1692     int nfds = epoll_wait(epollFd, events, MAX_EVENTS, -1);
1693 
1694     // The ckpt timer has expired; it's time to checkpoint.
1695     if (nfds == -1 && errno == EINTR && timerExpired) {
1696       timerExpired = false;
1697       startCheckpoint();
1698       continue;
1699     }
1700 
1701     JASSERT(nfds != -1) (JASSERT_ERRNO);
1702 
1703     for (int n = 0; n < nfds; ++n) {
1704       void *ptr = events[n].data.ptr;
1705       if ((events[n].events & EPOLLHUP) ||
1706 #ifdef EPOLLRDHUP
1707           (events[n].events & EPOLLRDHUP) ||
1708 #endif
1709           (events[n].events & EPOLLERR)) {
1710         JASSERT(ptr != listenSock);
1711         if (ptr == (void*) STDIN_FILENO) {
1712           JASSERT(epoll_ctl(epollFd, EPOLL_CTL_DEL, STDIN_FILENO, &ev) != -1)
1713             (JASSERT_ERRNO);
1714           close(STDIN_FD);
1715         } else {
1716           onDisconnect((CoordClient*)ptr);
1717         }
1718       } else if (events[n].events & EPOLLIN) {
1719         if (ptr == (void*) listenSock) {
1720           onConnect();
1721         } else if (ptr == (void*) STDIN_FILENO) {
1722           char buf[1];
1723           int ret = Util::readAll(STDIN_FD, buf, sizeof(buf));
1724           JASSERT(ret != -1) (JASSERT_ERRNO);
1725           if (ret > 0) {
1726             handleUserCommand(buf[0]);
1727           } else {
1728             JNOTE("closing stdin");
1729             JASSERT(epoll_ctl(epollFd, EPOLL_CTL_DEL, STDIN_FILENO, &ev) != -1)
1730               (JASSERT_ERRNO);
1731             close(STDIN_FD);
1732           }
1733         } else {
1734           onData((CoordClient*)ptr);
1735         }
1736       }
1737     }
1738   }
1739 }
1740 
1741 void DmtcpCoordinator::addDataSocket(CoordClient *client)
1742 {
1743   struct epoll_event ev;
1744 
1745 #ifdef EPOLLRDHUP
1746   ev.events = EPOLLIN | EPOLLRDHUP;
1747 #else
1748   ev.events = EPOLLIN;
1749 #endif
1750   ev.data.ptr = client;
1751   JASSERT(epoll_ctl(epollFd, EPOLL_CTL_ADD, client->sock().sockfd(), &ev) != -1)
1752     (JASSERT_ERRNO);
1753 }
1754 
1755 #define shift argc--; argv++
1756 
1757 int main ( int argc, char** argv )
1758 {
1759   initializeJalib();
1760 
1761   //parse port
1762   thePort = DEFAULT_PORT;
1763   const char* portStr = getenv( ENV_VAR_NAME_PORT );
1764   if ( portStr == NULL ) portStr = getenv("DMTCP_PORT"); // deprecated
1765   if ( portStr != NULL ) thePort = jalib::StringToInt( portStr );
1766 
1767   bool daemon = false;
1768 
1769   char * tmpdir_arg = NULL;
1770 
1771   shift;
1772   while(argc > 0){
1773     string s = argv[0];
1774     if(s=="-h" || s=="--help"){
1775       printf("%s", theUsage);
1776       return 1;
1777     } else if ((s=="--version") && argc==1){
1778       printf("%s", DMTCP_VERSION_AND_COPYRIGHT_INFO);
1779       return 1;
1780     }else if(s == "-q" || s == "--quiet"){
1781       // TODO(kapil): Ignored for now. Remove in later versions.
1782       shift;
1783     }else if(s=="--exit-on-last"){
1784       exitOnLast = true;
1785       shift;
1786     }else if(s=="--exit-after-ckpt"){
1787       exitAfterCkpt = true;
1788       shift;
1789     }else if(s=="--daemon"){
1790       daemon = true;
1791       shift;
1792     } else if (s == "-i" || s == "--interval") {
1793       setenv(ENV_VAR_CKPT_INTR, argv[1], 1);
1794       shift; shift;
1795     } else if (argv[0][0] == '-' && argv[0][1] == 'i' &&
1796                isdigit(argv[0][2])) { // else if -i5, for example
1797       setenv(ENV_VAR_CKPT_INTR, argv[0]+2, 1);
1798       shift;
1799     } else if (argc>1 && (s == "-p" || s == "--port" || s == "--coord-port")) {
1800       thePort = jalib::StringToInt( argv[1] );
1801       shift; shift;
1802     } else if (argv[0][0] == '-' && argv[0][1] == 'p' &&
1803                isdigit(argv[0][2])) { // else if -p0, for example
1804       thePort = jalib::StringToInt( argv[0]+2 );
1805       shift;
1806     }else if(argc>1 && s == "--port-file"){
1807       thePortFile = argv[1];
1808       shift; shift;
1809     }else if(argc>1 && (s == "-c" || s == "--ckptdir")){
1810       setenv(ENV_VAR_CHECKPOINT_DIR, argv[1], 1);
1811       shift; shift;
1812     }else if(argc>1 && (s == "-t" || s == "--tmpdir")){
1813       tmpdir_arg = argv[1];
1814       shift; shift;
1815     }else if(argc == 1){ //last arg can be port
1816       char *endptr;
1817       long x = strtol(argv[0], &endptr, 10);
1818       if ((ssize_t)strlen(argv[0]) != endptr - argv[0]) {
1819         fprintf(stderr, theUsage, DEFAULT_PORT);
1820         return 1;
1821       } else {
1822         thePort = jalib::StringToInt( argv[0] );
1823         shift;
1824       }
1825       x++, x--; // to suppress unused variable warning
1826     }else{
1827       fprintf(stderr, theUsage, DEFAULT_PORT);
1828       return 1;
1829     }
1830   }
1831 
1832   tmpDir = Util::calcTmpDir(tmpdir_arg);
1833   Util::initializeLogFile(tmpDir);
1834 
1835   JTRACE ( "New DMTCP coordinator starting." )
1836     ( UniquePid::ThisProcess() );
1837 
1838   if ( thePort < 0 )
1839   {
1840     fprintf(stderr, theUsage, DEFAULT_PORT);
1841     return 1;
1842   }
1843 
1844   calcLocalAddr();
1845 
1846   if (getenv(ENV_VAR_CHECKPOINT_DIR) != NULL) {
1847     ckptDir = getenv(ENV_VAR_CHECKPOINT_DIR);
1848   } else {
1849     ckptDir = get_current_dir_name();
1850   }
1851 
1852   /*Test if the listener socket is already open*/
1853   if ( fcntl(PROTECTED_COORD_FD, F_GETFD) != -1 ) {
1854     listenSock = new jalib::JServerSocket ( PROTECTED_COORD_FD );
1855     JASSERT ( listenSock->port() != -1 ) .Text ( "Invalid listener socket" );
1856     JTRACE ( "Using already created listener socker" ) ( listenSock->port() );
1857   } else {
1858 
1859     errno = 0;
1860     listenSock = new jalib::JServerSocket(jalib::JSockAddr::ANY, thePort, 128);
1861     JASSERT ( listenSock->isValid() ) ( thePort ) ( JASSERT_ERRNO )
1862       .Text ( "Failed to create listen socket."
1863        "\nIf msg is \"Address already in use\", this may be an old coordinator."
1864        "\nKill default coordinator and try again:  dmtcp_command -q"
1865        "\nIf that fails, \"pkill -9 dmtcp_coord\","
1866        " and try again in a minute or so." );
1867   }
1868 
1869   thePort = listenSock->port();
1870   if (!thePortFile.empty()) {
1871     Util::writeCoordPortToFile(thePort, thePortFile.c_str());
1872   }
1873 
1874   //parse checkpoint interval
1875   const char* interval = getenv ( ENV_VAR_CKPT_INTR );
1876   if ( interval != NULL ) {
1877     theDefaultCheckpointInterval = jalib::StringToInt ( interval );
1878     theCheckpointInterval = theDefaultCheckpointInterval;
1879   }
1880 
1881 #if 0
1882   JASSERT_STDERR <<
1883     "dmtcp_coordinator starting..." <<
1884     "\n    Port: " << thePort <<
1885     "\n    Checkpoint Interval: ";
1886   if(theCheckpointInterval==0)
1887     JASSERT_STDERR << "disabled (checkpoint manually instead)";
1888   else
1889     JASSERT_STDERR << theCheckpointInterval;
1890   JASSERT_STDERR  <<
1891     "\n    Exit on last client: " << exitOnLast << "\n";
1892 #else
1893 
1894   fprintf(stderr, "dmtcp_coordinator starting..."
1895           "\n    Host: %s (%s)"
1896           "\n    Port: %d"
1897           "\n    Checkpoint Interval: ",
1898           coordHostname.c_str(), inet_ntoa(localhostIPAddr), thePort);
1899   if(theCheckpointInterval==0)
1900     fprintf(stderr, "disabled (checkpoint manually instead)");
1901   else
1902     fprintf(stderr, "%d", theCheckpointInterval);
1903   fprintf(stderr, "\n    Exit on last client: %d\n", exitOnLast);
1904 #endif
1905 
1906   if (daemon) {
1907     JASSERT_STDERR  << "Backgrounding...\n";
1908     int fd = open("/dev/null", O_RDWR);
1909     JASSERT(dup2(fd, STDIN_FILENO) == STDIN_FILENO);
1910     JASSERT(dup2(fd, STDOUT_FILENO) == STDOUT_FILENO);
1911     JASSERT(dup2(fd, STDERR_FILENO) == STDERR_FILENO);
1912     JASSERT_CLOSE_STDERR();
1913     if (fd > STDERR_FILENO) {
1914       close(fd);
1915     }
1916 
1917     if (fork() > 0) {
1918       JTRACE ( "Parent Exiting after fork()" );
1919       exit(0);
1920     }
1921     //pid_t sid = setsid();
1922   } else {
1923     JASSERT_STDERR  <<
1924       "Type '?' for help." <<
1925       "\n\n";
1926   }
1927 
1928   /* We set up the signal handler for SIGINT and SIGALRM.
1929    * SIGINT is used to send DMT_KILL_PEER message to all the connected peers
1930    * before exiting.
1931    * SIGALRM is used for interval checkpointing.
1932    */
1933   setupSignalHandlers();
1934 
1935   /* If the coordinator was started transparently by dmtcp_launch, then we
1936    * want to block signals, such as SIGINT.  To see why this is important:
1937    * % gdb dmtcp_launch a.out
1938    * (gdb) run
1939    * ^C   # Stop gdb to get its attention, and continue debugging.
1940    * # The above scenario causes the SIGINT to go to a.out and its child,
1941    * # the dmtcp_coordinator.  The coord then triggers the SIGINT handler,
1942    * # which sends DMT_KILL_PEER to kill a.out.
1943    */
1944   if ( exitOnLast && daemon ) {
1945     sigset_t set;
1946     sigfillset(&set);
1947     // sigprocmask is only per-thread; but the coordinator is single-threaded.
1948     sigprocmask(SIG_BLOCK, &set, NULL);
1949   }
1950 
1951   prog.eventLoop(daemon);
1952   return 0;
1953 }

/* [<][>][^][v][top][bottom][index][help] */