root/dmtcp_coordinator.cpp
/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- readProcessInfo
- getNewVirtualPid
- handleUserCommand
- printStatus
- updateMinimumState
- onData
- removeStaleSharedAreaFile
- preExitCleanup
- onDisconnect
- initializeComputation
- onConnect
- processDmtUserCmd
- validateRestartingWorkerProcess
- validateNewWorkerProcess
- startCheckpoint
- broadcastMessage
- getStatus
- writeRestartScript
- signalHandler
- setupSignalHandlers
- calcLocalAddr
- resetCkptTimer
- updateCheckpointInterval
- eventLoop
- addDataSocket
- main
1 /****************************************************************************
2 * Copyright (C) 2006-2013 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of DMTCP. *
6 * *
7 * DMTCP is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 /****************************************************************************
23 * Coordinator code logic: *
24 * main calls eventLoop, a top-level event loop. *
25 * eventLoop calls: onConnect, onData, onDisconnect, startCheckpoint *
26 * when client or dmtcp_command talks to coordinator. *
27 * onConnect called on msg at listener port. It passes control to: *
28 * handleUserCommand, which takes single char arg ('s', 'c', 'k', 'q', ...)*
29 * handleUserCommand calls broadcastMessage to send data back *
30 * any message sent by broadcastMessage takes effect only on returning *
31 * back up to top level monitorSockets *
32 * Hence, even for checkpoint, handleUserCommand just changes state, *
33 * broadcasts an initial checkpoint command, and then returns to top *
34 * level. Replies from clients then drive further state changes. *
35 * The prefix command 'b' (blocking) from dmtcp_command modifies behavior *
36 * of 'c' so that the reply to dmtcp_command happens only when clients *
37 * are back in RUNNING state. *
38 * onData called when a message arrives at a client's port. It either *
39 * processes a per-client special request, or continues the protocol *
40 * for a checkpoint or restart sequence (see below). *
41 * *
42 * updateMinimumState() is responsible for keeping track of states. *
43 * The coordinator keeps a ComputationStatus, with minimumState and *
44 * maximumState for states of all workers, accessed through getStatus() *
45 * or through minimumState() *
46 * The states for a worker (client) are: *
47 * Checkpoint: RUNNING -> SUSPENDED -> FD_LEADER_ELECTION -> DRAINED *
48 * -> CHECKPOINTED -> NAME_SERVICE_DATA_REGISTERED *
49 * -> DONE_QUERYING -> REFILLED -> RUNNING *
50 * Restart: RESTARTING -> CHECKPOINTED -> NAME_SERVICE_DATA_REGISTERED *
51 * -> DONE_QUERYING -> REFILLED -> RUNNING *
52 * If debugging, set gdb breakpoint on: *
53 * DmtcpCoordinator::onConnect *
54 * DmtcpCoordinator::onData *
55 * DmtcpCoordinator::handleUserCommand *
56 * DmtcpCoordinator::broadcastMessage *
57 ****************************************************************************/
58
59 #include <stdlib.h>
60 #include "dmtcp_coordinator.h"
61 #include "constants.h"
62 #include "protectedfds.h"
63 #include "dmtcpmessagetypes.h"
64 #include "lookup_service.h"
65 #include "syscallwrappers.h"
66 #include "util.h"
67 #include "../jalib/jconvert.h"
68 #include "../jalib/jtimer.h"
69 #include "../jalib/jfilesystem.h"
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <sys/stat.h>
73 #include <algorithm>
74 #include <iomanip>
75 #include <sys/wait.h>
76 #include <sys/types.h>
77 #include <sys/stat.h>
78 #include <netdb.h>
79 #include <arpa/inet.h>
80 #include <fcntl.h>
81 #undef min
82 #undef max
83
84 #define BINARY_NAME "dmtcp_coordinator"
85
86 using namespace dmtcp;
87
88 static const char* theHelpMessage =
89 "COMMANDS:\n"
90 " l : List connected nodes\n"
91 " s : Print status message\n"
92 " c : Checkpoint all nodes\n"
93 " i : Print current checkpoint interval\n"
94 " (To change checkpoint interval, use dmtcp_command)\n"
95 " k : Kill all nodes\n"
96 " q : Kill all nodes and quit\n"
97 " ? : Show this message\n"
98 "\n"
99 ;
100
101 static const char* theUsage =
102 "Usage: dmtcp_coordinator [OPTIONS] [port]\n"
103 "Coordinates checkpoints between multiple processes.\n\n"
104 "Options:\n"
105 " -p, --coord-port PORT_NUM (environment variable DMTCP_COORD_PORT)\n"
106 " Port to listen on (default: 7779)\n"
107 " --port-file filename\n"
108 " File to write listener port number.\n"
109 " (Useful with '--port 0', which is used to assign a random port)\n"
110 " --ckptdir (environment variable DMTCP_CHECKPOINT_DIR):\n"
111 " Directory to store dmtcp_restart_script.sh (default: ./)\n"
112 " --tmpdir (environment variable DMTCP_TMPDIR):\n"
113 " Directory to store temporary files (default: env var TMDPIR or /tmp)\n"
114 " --exit-on-last\n"
115 " Exit automatically when last client disconnects\n"
116 " --exit-after-ckpt\n"
117 " Exit automatically after checkpoint is created\n"
118 " --daemon\n"
119 " Run silently in the background after detaching from the parent process.\n"
120 " -i, --interval (environment variable DMTCP_CHECKPOINT_INTERVAL):\n"
121 " Time in seconds between automatic checkpoints\n"
122 " (default: 0, disabled)\n"
123 " --help:\n"
124 " Print this message and exit.\n"
125 " --version:\n"
126 " Print version information and exit.\n"
127 "\n"
128 "COMMANDS:\n"
129 " type '?<return>' at runtime for list\n"
130 "\n"
131 HELP_AND_CONTACT_INFO
132 "\n"
133 ;
134
135
136 static const char* theRestartScriptHeader =
137 "#!/bin/bash\n\n"
138 "set -m # turn on job control\n\n"
139 "#This script launches all the restarts in the background.\n"
140 "#Suggestions for editing:\n"
141 "# 1. For those processes executing on the localhost, remove\n"
142 "# 'ssh <hostname> from the start of the line.\n"
143 "# 2. If using ssh, verify that ssh does not require passwords or other\n"
144 "# prompts.\n"
145 "# 3. Verify that the dmtcp_restart command is in your path on all hosts,\n"
146 "# otherwise set the dmt_rstr_cmd appropriately.\n"
147 "# 4. Verify DMTCP_COORD_HOST and DMTCP_COORD_PORT match the location of\n"
148 "# the dmtcp_coordinator. If necessary, add\n"
149 "# 'DMTCP_COORD_PORT=<dmtcp_coordinator port>' after\n"
150 "# 'DMTCP_COORD_HOST=<...>'.\n"
151 "# 5. Remove the '&' from a line if that process reads STDIN.\n"
152 "# If multiple processes read STDIN then prefix the line with\n"
153 "# 'xterm -hold -e' and put '&' at the end of the line.\n"
154 "# 6. Processes on same host can be restarted with single dmtcp_restart\n"
155 "# command.\n\n\n"
156 ;
157
158
159 static const char* theRestartScriptCheckLocal =
160 "check_local()\n"
161 "{\n"
162 " worker_host=$1\n"
163 " unset is_local_node\n"
164 " worker_ip=$(gethostip -d $worker_host 2> /dev/null)\n"
165 " if [ -z \"$worker_ip\" ]; then\n"
166 " worker_ip=$(nslookup $worker_host | grep -A1 'Name:' | grep 'Address:' | sed -e 's/Address://' -e 's/ //' -e 's/ //')\n"
167 " fi\n"
168 " if [ -z \"$worker_ip\" ]; then\n"
169 " worker_ip=$(getent ahosts $worker_host |grep \"^[0-9]\\+\\.[0-9]\\+\\.[0-9]\\+\\.[0-9]\\+ *STREAM\" | cut -d' ' -f1)\n"
170 " fi\n"
171 " if [ -z \"$worker_ip\" ]; then\n"
172 " echo Could not find ip-address for $worker_host. Exiting...\n"
173 " exit 1\n"
174 " fi\n"
175 " ifconfig_path=$(which ifconfig)\n"
176 " if [ -z \"$ifconfig_path\" ]; then\n"
177 " ifconfig_path=\"/sbin/ifconfig\"\n"
178 " fi\n"
179 " output=$($ifconfig_path -a | grep \"inet addr:.*${worker_ip} .*Bcast\")\n"
180 " if [ -n \"$output\" ]; then\n"
181 " is_local_node=1\n"
182 " else\n"
183 " is_local_node=0\n"
184 " fi\n"
185 "}\n\n\n";
186
187 static const char *slurmHelperContactFunction =
188 "pass_slurm_helper_contact()\n"
189 "{\n"
190 " LOCAL_FILES=\"$1\"\n"
191 " # Create temp directory if needed\n"
192 " if [ -n \"$DMTCP_TMPDIR\" ]; then\n"
193 " CURRENT_TMPDIR=$DMTCP_TMPDIR/dmtcp-`whoami`@`hostname`\n"
194 " elif [ -n \"$TMPDIR\" ]; then\n"
195 " CURRENT_TMPDIR=$TMPDIR/dmtcp-`whoami`@`hostname`\n"
196 " else\n"
197 " CURRENT_TMPDIR=/tmp/dmtcp-`whoami`@`hostname`\n"
198 " fi\n"
199 " if [ ! -d \"$CURRENT_TMPDIR\" ]; then\n"
200 " mkdir -p $CURRENT_TMPDIR\n"
201 " fi\n"
202 " # Create files with SLURM environment\n"
203 " for CKPT_FILE in $LOCAL_FILES; do\n"
204 " SUFFIX=${CKPT_FILE%%.dmtcp}\n"
205 " SLURM_ENV_FILE=$CURRENT_TMPDIR/slurm_env_${SUFFIX##*_}\n"
206 " echo \"DMTCP_SRUN_HELPER_ADDR=$DMTCP_SRUN_HELPER_ADDR\" >> $SLURM_ENV_FILE\n"
207 " done\n"
208 "}\n\n\n";
209
210 static const char* theRestartScriptUsage =
211 "usage_str='USAGE:\n"
212 " dmtcp_restart_script.sh [OPTIONS]\n\n"
213 "OPTIONS:\n"
214 " --coord-host, -h, (environment variable DMTCP_COORD_HOST):\n"
215 " Hostname where dmtcp_coordinator is running\n"
216 " --coord-port, -p, (environment variable DMTCP_COORD_PORT):\n"
217 " Port where dmtcp_coordinator is running\n"
218 " --hostfile <arg0> :\n"
219 " Provide a hostfile (One host per line, \"#\" indicates comments)\n"
220 " --ckptdir, -d, (environment variable DMTCP_CHECKPOINT_DIR):\n"
221 " Directory to store checkpoint images\n"
222 " (default: use the same directory used in previous checkpoint)\n"
223 " --restartdir, -d, (environment variable DMTCP_RESTART_DIR):\n"
224 " Directory to read checkpoint images from\n"
225 " --tmpdir, -t, (environment variable DMTCP_TMPDIR):\n"
226 " Directory to store temporary files (default: $TMDPIR or /tmp)\n"
227 " --no-strict-uid-checking:\n"
228 " Disable uid checking for the checkpoint image. This allows the\n"
229 " checkpoint image to be restarted by a different user than the one\n"
230 " that create it. (environment variable DMTCP_DISABLE_UID_CHECKING)\n"
231 " --interval, -i, (environment variable DMTCP_CHECKPOINT_INTERVAL):\n"
232 " Time in seconds between automatic checkpoints\n"
233 " (Default: Use pre-checkpoint value)\n"
234 " --help:\n"
235 " Print this message and exit.\'\n"
236 "\n\n"
237 ;
238
239 static const char* theRestartScriptCmdlineArgHandler =
240 "if [ $# -gt 0 ]; then\n"
241 " while [ $# -gt 0 ]\n"
242 " do\n"
243 " if [ $1 = \"--help\" ]; then\n"
244 " echo \"$usage_str\"\n"
245 " exit\n"
246 " elif [ $# -ge 1 ]; then\n"
247 " case \"$1\" in\n"
248 " --coord-host|--host|-h)\n"
249 " coord_host=\"$2\"\n"
250 " shift; shift;;\n"
251 " --coord-port|--port|-p)\n"
252 " coord_port=\"$2\"\n"
253 " shift; shift;;\n"
254 " --hostfile)\n"
255 " hostfile=\"$2\"\n"
256 " if [ ! -f \"$hostfile\" ]; then\n"
257 " echo \"ERROR: hostfile $hostfile not found\"\n"
258 " exit\n"
259 " fi\n"
260 " shift; shift;;\n"
261 " --restartdir|-d)\n"
262 " DMTCP_RESTART_DIR=$2\n"
263 " shift; shift;;\n"
264 " --ckptdir|-d)\n"
265 " DMTCP_CKPT_DIR=$2\n"
266 " shift; shift;;\n"
267 " --tmpdir|-t)\n"
268 " DMTCP_TMPDIR=$2\n"
269 " shift; shift;;\n"
270 " --no-strict-uid-checking)\n"
271 " noStrictUidChecking=\"--no-strict-uid-checking\"\n"
272 " shift;;\n"
273 " --interval|-i)\n"
274 " checkpoint_interval=$2\n"
275 " shift; shift;;\n"
276 " *)\n"
277 " echo \"$0: unrecognized option \'$1\'. See correct usage below\"\n"
278 " echo \"$usage_str\"\n"
279 " exit;;\n"
280 " esac\n"
281 " elif [ $1 = \"--help\" ]; then\n"
282 " echo \"$usage_str\"\n"
283 " exit\n"
284 " else\n"
285 " echo \"$0: Incorrect usage. See correct usage below\"\n"
286 " echo\n"
287 " echo \"$usage_str\"\n"
288 " exit\n"
289 " fi\n"
290 " done\n"
291 "fi\n\n"
292 ;
293
294 static const char* theRestartScriptSingleHostProcessing =
295 "ckpt_files=\"\"\n"
296 "if [ ! -z \"$DMTCP_RESTART_DIR\" ]; then\n"
297 " for tmp in $given_ckpt_files; do\n"
298 " ckpt_files=\"$DMTCP_RESTART_DIR/$(basename $tmp) $ckpt_files\"\n"
299 " done\n"
300 "else\n"
301 " ckpt_files=$given_ckpt_files\n"
302 "fi\n\n"
303
304 "coordinator_info=\"--coord-host $coord_host --coord-port $coord_port\"\n"
305
306 "tmpdir=\n"
307 "if [ ! -z \"$DMTCP_TMPDIR\" ]; then\n"
308 " tmpdir=\"--tmpdir $DMTCP_TMPDIR\"\n"
309 "fi\n\n"
310
311 "ckpt_dir=\n"
312 "if [ ! -z \"$DMTCP_CKPT_DIR\" ]; then\n"
313 " ckpt_dir=\"--ckptdir $DMTCP_CKPT_DIR\"\n"
314 "fi\n\n"
315
316 "exec $dmt_rstr_cmd $coordinator_info $ckpt_dir \\\n"
317 " $maybejoin --interval \"$checkpoint_interval\" $tmpdir $noStrictUidChecking \\\n"
318 " $ckpt_files\n"
319 ;
320
321 static const char* theRestartScriptMultiHostProcessing =
322 "worker_ckpts_regexp=\\\n"
323 "\'[^:]*::[ \\t\\n]*\\([^ \\t\\n]\\+\\)[ \\t\\n]*:\\([a-z]\\+\\):[ \\t\\n]*\\([^:]\\+\\)\'\n\n"
324
325 "worker_hosts=$(\\\n"
326 " echo $worker_ckpts | sed -e \'s/\'\"$worker_ckpts_regexp\"\'/\\1 /g\')\n"
327 "restart_modes=$(\\\n"
328 " echo $worker_ckpts | sed -e \'s/\'\"$worker_ckpts_regexp\"\'/: \\2/g\')\n"
329 "ckpt_files_groups=$(\\\n"
330 " echo $worker_ckpts | sed -e \'s/\'\"$worker_ckpts_regexp\"\'/: \\3/g\')\n"
331 "\n"
332 "if [ ! -z \"$hostfile\" ]; then\n"
333 " worker_hosts=$(\\\n"
334 " cat \"$hostfile\" | sed -e \'s/#.*//\' -e \'s/[ \\t\\r]*//\' -e \'/^$/ d\')\n"
335 "fi\n\n"
336
337 "localhost_ckpt_files_group=\n\n"
338
339 "num_worker_hosts=$(echo $worker_hosts | wc -w)\n\n"
340
341 "maybejoin=\n"
342 "if [ \"$num_worker_hosts\" != \"1\" ]; then\n"
343 " maybejoin='--join'\n"
344 "fi\n\n"
345
346 "for worker_host in $worker_hosts\n"
347 "do\n\n"
348 " ckpt_files_group=$(\\\n"
349 " echo $ckpt_files_groups | sed -e \'s/[^:]*:[ \\t\\n]*\\([^:]*\\).*/\\1/\')\n"
350 " ckpt_files_groups=$(echo $ckpt_files_groups | sed -e \'s/[^:]*:[^:]*//\')\n"
351 "\n"
352 " mode=$(echo $restart_modes | sed -e \'s/[^:]*:[ \\t\\n]*\\([^:]*\\).*/\\1/\')\n"
353 " restart_modes=$(echo $restart_modes | sed -e \'s/[^:]*:[^:]*//\')\n\n"
354 " maybexterm=\n"
355 " maybebg=\n"
356 " case $mode in\n"
357 " bg) maybebg=\'bg\';;\n"
358 " xterm) maybexterm=xterm;;\n"
359 " fg) ;;\n"
360 " *) echo \"WARNING: Unknown Mode\";;\n"
361 " esac\n\n"
362 " if [ -z \"$ckpt_files_group\" ]; then\n"
363 " break;\n"
364 " fi\n\n"
365
366 " new_ckpt_files_group=\"\"\n"
367 " for tmp in $ckpt_files_group\n"
368 " do\n"
369 " if [ ! -z \"$DMTCP_RESTART_DIR\" ]; then\n"
370 " tmp=$DMTCP_RESTART_DIR/$(basename $tmp)\n"
371 " fi\n"
372 " new_ckpt_files_group=\"$new_ckpt_files_group $tmp\"\n"
373 " done\n\n"
374
375 "tmpdir=\n"
376 "if [ ! -z \"$DMTCP_TMPDIR\" ]; then\n"
377 " tmpdir=\"--tmpdir $DMTCP_TMPDIR\"\n"
378 "fi\n\n"
379
380 " check_local $worker_host\n"
381 " if [ \"$is_local_node\" -eq 1 -o \"$num_worker_hosts\" == \"1\" ]; then\n"
382 " localhost_ckpt_files_group=\"$new_ckpt_files_group\"\n"
383 " continue\n"
384 " fi\n"
385
386 " if [ -z $maybebg ]; then\n"
387 " $maybexterm /usr/bin/ssh -t \"$worker_host\" \\\n"
388 " $dmt_rstr_cmd --coord-host \"$coord_host\""
389 " --cord-port \"$coord_port\"\\\n"
390 " $ckpt_dir --join --interval \"$checkpoint_interval\" $tmpdir \\\n"
391 " $new_ckpt_files_group\n"
392 " else\n"
393 " $maybexterm /usr/bin/ssh \"$worker_host\" \\\n"
394 // In Open MPI 1.4, without this (sh -c ...), orterun hangs at the
395 // end of the computation until user presses enter key.
396 " \"/bin/sh -c \'$dmt_rstr_cmd --coord-host $coord_host"
397 " --coord-port $coord_port\\\n"
398 " $ckpt_dir --join --interval \"$checkpoint_interval\" $tmpdir \\\n"
399 " $new_ckpt_files_group\'\" &\n"
400 " fi\n\n"
401 "done\n\n"
402 "if [ -n \"$localhost_ckpt_files_group\" ]; then\n"
403 "exec $dmt_rstr_cmd --coord-host \"$coord_host\""
404 " --coord-port \"$coord_port\" \\\n"
405 " $ckpt_dir $maybejoin --interval \"$checkpoint_interval\" $tmpdir $noStrictUidChecking $localhost_ckpt_files_group\n"
406 "fi\n\n"
407
408 "#wait for them all to finish\n"
409 "wait\n"
410 ;
411
412 static int thePort = -1;
413 static string thePortFile;
414
415 static bool exitOnLast = false;
416 static bool blockUntilDone = false;
417 static bool exitAfterCkpt = false;
418 static int blockUntilDoneRemote = -1;
419
420 static DmtcpCoordinator prog;
421
422 /* The coordinator can receive a second checkpoint request while processing the
423 * first one. If the second request comes at a point where the coordinator has
424 * broadcast DMT_DO_SUSPEND message but the workers haven't replied, the
425 * coordinator sends another DMT_DO_SUSPEND message. The workers, having
426 * replied to the first DMTCP_DO_SUSPEND message (by suspending all the user
427 * threads), are waiting for the next message (DMT_DO_FD_LEADER_ELECTION or
428 * DMT_KILL_PEER), however they receive DMT_DO_SUSPEND message and thus exit()
429 * indicating an error.
430 * The fix to this problem is to introduce a global
431 * variable "workersRunningAndSuspendMsgSent" which, as the name implies,
432 * indicates that the DMT_DO_SUSPEND message has been sent and the coordinator
433 * is waiting for replies from the workers. If this variable is set, the
434 * coordinator will not process another checkpoint request.
435 */
436 static bool workersRunningAndSuspendMsgSent = false;
437
438 static bool killInProgress = false;
439 static bool uniqueCkptFilenames = false;
440
441 /* If dmtcp_launch/dmtcp_restart specifies '-i', theCheckpointInterval
442 * will be reset accordingly (valid for current computation). If dmtcp_command
443 * specifies '-i' (or if user interactively invokes 'i' in coordinator),
444 * then both theCheckpointInterval and theDefaultCheckpointInterval are set.
445 * A value of '0' means: never checkpoint (manual checkpoint only).
446 */
447 static uint32_t theCheckpointInterval = 0; /* Current checkpoint interval */
448 static uint32_t theDefaultCheckpointInterval = 0; /* Reset to this on new comp. */
449 static bool isRestarting = false;
450 static bool timerExpired = false;
451
452 static void resetCkptTimer();
453
454 const int STDIN_FD = fileno ( stdin );
455
456 JTIMER ( checkpoint );
457 JTIMER ( restart );
458
459 static UniquePid compId;
460 static int numPeers = -1;
461 static time_t curTimeStamp = -1;
462 static time_t ckptTimeStamp = -1;
463
464 static LookupService lookupService;
465
466 static string coordHostname;
467 static struct in_addr localhostIPAddr;
468
469 static string tmpDir;
470 static string ckptDir;
471
472 #define MAX_EVENTS 10000
473 struct epoll_event events[MAX_EVENTS];
474 int epollFd;
475 static jalib::JSocket *listenSock = NULL;
476
477 static void removeStaleSharedAreaFile();
478 static void preExitCleanup();
479
480 static pid_t _nextVirtualPid = INITIAL_VIRTUAL_PID;
481
482 static int theNextClientNumber = 1;
483 vector<CoordClient*> clients;
484
485 CoordClient::CoordClient(const jalib::JSocket& sock,
486 const struct sockaddr_storage *addr,
487 socklen_t len,
488 DmtcpMessage &hello_remote,
489 int isNSWorker)
490 : _sock(sock)
491 {
492 _isNSWorker = isNSWorker;
493 _realPid = hello_remote.realPid;
494 _clientNumber = theNextClientNumber++;
495 _identity = hello_remote.from;
496 _state = hello_remote.state;
497 struct sockaddr_in *in = (struct sockaddr_in*) addr;
498 _ip = inet_ntoa(in->sin_addr);
499 }
500
501 void CoordClient::readProcessInfo(DmtcpMessage& msg)
502 {
503 if (msg.extraBytes > 0) {
504 char* extraData = new char[msg.extraBytes];
505 _sock.readAll(extraData, msg.extraBytes);
506 _hostname = extraData;
507 _progname = extraData + _hostname.length() + 1;
508 delete [] extraData;
509 }
510 }
511
512 pid_t DmtcpCoordinator::getNewVirtualPid()
513 {
514 pid_t pid = -1;
515 JASSERT(_virtualPidToClientMap.size() < MAX_VIRTUAL_PID/1000)
516 .Text("Exceeded maximum number of processes allowed");
517 while (1) {
518 pid = _nextVirtualPid;
519 _nextVirtualPid += 1000;
520 if (_nextVirtualPid > MAX_VIRTUAL_PID) {
521 _nextVirtualPid = INITIAL_VIRTUAL_PID;
522 }
523 if (_virtualPidToClientMap.find(pid) == _virtualPidToClientMap.end()) {
524 break;
525 }
526 }
527 JASSERT(pid != -1) .Text("Not Reachable");
528 return pid;
529 }
530
531 void DmtcpCoordinator::handleUserCommand(char cmd, DmtcpMessage* reply /*= NULL*/)
532 {
533 if (reply != NULL) reply->coordCmdStatus = CoordCmdStatus::NOERROR;
534
535 switch ( cmd ){
536 case 'b': case 'B': // prefix blocking command, prior to checkpoint command
537 JTRACE ( "blocking checkpoint beginning..." );
538 blockUntilDone = true;
539 break;
540 case 'x': case 'X': // prefix exit command, prior to checkpoint command
541 JTRACE ( "Will exit after creating the checkpoint..." );
542 exitAfterCkpt = true;
543 break;
544 case 'c': case 'C':
545 JTRACE ( "checkpointing..." );
546 if(startCheckpoint()){
547 if (reply != NULL) reply->numPeers = getStatus().numPeers;
548 }else{
549 if (reply != NULL) reply->coordCmdStatus = CoordCmdStatus::ERROR_NOT_RUNNING_STATE;
550 }
551 break;
552 case 'i': case 'I':
553 JTRACE("setting checkpoint interval...");
554 updateCheckpointInterval ( theCheckpointInterval );
555 if (theCheckpointInterval == 0)
556 printf("Current Checkpoint Interval:"
557 " Disabled (checkpoint manually instead)\n");
558 else
559 printf("Current Checkpoint Interval: %d\n", theCheckpointInterval);
560 if (theDefaultCheckpointInterval == 0)
561 printf("Default Checkpoint Interval:"
562 " Disabled (checkpoint manually instead)\n");
563 else
564 printf("Default Checkpoint Interval: %d\n", theDefaultCheckpointInterval);
565 break;
566 case 'l': case 'L':
567 case 't': case 'T':
568 JASSERT_STDERR << "Client List:\n";
569 JASSERT_STDERR << "#, PROG[virtPID:realPID]@HOST, DMTCP-UNIQUEPID, STATE\n";
570 for (size_t i = 0; i < clients.size(); i++) {
571 JASSERT_STDERR << clients[i]->clientNumber()
572 << ", " << clients[i]->progname()
573 << "[" << clients[i]->identity().pid() << ":" << clients[i]->realPid()
574 << "]@" << clients[i]->hostname()
575 #ifdef PRINT_REMOTE_IP
576 << "(" << clients[i]->ip() << ")"
577 #endif
578 << ", " << clients[i]->identity()
579 << ", " << clients[i]->state().toString()
580 << '\n';
581 }
582 break;
583 case 'q': case 'Q':
584 {
585 JNOTE ( "killing all connected peers and quitting ..." );
586 broadcastMessage ( DMT_KILL_PEER );
587 JASSERT_STDERR << "DMTCP coordinator exiting... (per request)\n";
588 for (size_t i = 0; i < clients.size(); i++) {
589 clients[i]->sock().close();
590 }
591 listenSock->close();
592 preExitCleanup();
593 JTRACE ("Exiting ...");
594 exit ( 0 );
595 break;
596 }
597 case 'k': case 'K':
598 JNOTE ( "Killing all connected Peers..." );
599 //FIXME: What happens if a 'k' command is followed by a 'c' command before
600 // the *real* broadcast takes place? --Kapil
601 broadcastMessage ( DMT_KILL_PEER );
602 break;
603 case 'h': case 'H': case '?':
604 JASSERT_STDERR << theHelpMessage;
605 break;
606 case 's': case 'S':
607 {
608 ComputationStatus s = getStatus();
609 bool running = (s.minimumStateUnanimous &&
610 s.minimumState==WorkerState::RUNNING);
611 if (reply != NULL) {
612 reply->numPeers = s.numPeers;
613 reply->isRunning = running;
614 reply->theCheckpointInterval = theCheckpointInterval;
615 } else {
616 printStatus(s.numPeers, running);
617 }
618 break;
619 }
620 case ' ': case '\t': case '\n': case '\r':
621 //ignore whitespace
622 break;
623 default:
624 JTRACE("unhandled user command")(cmd);
625 if (reply != NULL){
626 reply->coordCmdStatus = CoordCmdStatus::ERROR_INVALID_COMMAND;
627 }
628 }
629 return;
630 }
631
632 void DmtcpCoordinator::printStatus(size_t numPeers, bool isRunning)
633 {
634 ostringstream o;
635 o << "Status..." << std::endl
636 << "Host: " << coordHostname
637 << " (" << inet_ntoa(localhostIPAddr) << ")" << std::endl
638 << "Port: " << thePort << std::endl
639 << "Checkpoint Interval: ";
640
641 if (theCheckpointInterval == 0) {
642 o << "disabled (checkpoint manually instead)" << std::endl;
643 } else {
644 o << theCheckpointInterval << std::endl;
645 }
646
647 o << "Exit on last client: " << exitOnLast << std::endl
648 << "Exit after checkpoint: " << exitAfterCkpt << std::endl
649 << "Computation Id: " << compId << std::endl
650 << "Checkpoint Dir: " << ckptDir << std::endl
651 << "NUM_PEERS=" << numPeers << std::endl
652 << "RUNNING=" << (isRunning ? "yes" : "no") << std::endl;
653 printf("%s", o.str().c_str());
654 fflush(stdout);
655 }
656
657 void DmtcpCoordinator::updateMinimumState(WorkerState oldState)
658 {
659 WorkerState newState = minimumState();
660
661 if ( oldState == WorkerState::RUNNING
662 && newState == WorkerState::SUSPENDED )
663 {
664 JNOTE ( "locking all nodes" );
665 broadcastMessage(DMT_DO_FD_LEADER_ELECTION, getStatus().numPeers );
666 }
667 if ( oldState == WorkerState::SUSPENDED
668 && newState == WorkerState::FD_LEADER_ELECTION )
669 {
670 JNOTE ( "draining all nodes" );
671 broadcastMessage ( DMT_DO_DRAIN );
672 }
673 if ( oldState == WorkerState::FD_LEADER_ELECTION
674 && newState == WorkerState::DRAINED )
675 {
676 JNOTE ( "checkpointing all nodes" );
677 broadcastMessage ( DMT_DO_CHECKPOINT );
678 }
679
680 #ifdef COORD_NAMESERVICE
681 if ( oldState == WorkerState::DRAINED
682 && newState == WorkerState::CHECKPOINTED )
683 {
684 writeRestartScript();
685 if (exitAfterCkpt) {
686 JNOTE("Checkpoint Done. Killing all peers.");
687 broadcastMessage(DMT_KILL_PEER);
688 exitAfterCkpt = false;
689 } else {
690 JNOTE ( "building name service database" );
691 lookupService.reset();
692 broadcastMessage ( DMT_DO_REGISTER_NAME_SERVICE_DATA );
693 }
694 }
695 if ( oldState == WorkerState::RESTARTING
696 && newState == WorkerState::CHECKPOINTED )
697 {
698 JTIMER_STOP ( restart );
699
700 lookupService.reset();
701 JNOTE ( "building name service database (after restart)" );
702 broadcastMessage ( DMT_DO_REGISTER_NAME_SERVICE_DATA );
703 }
704 if ( oldState == WorkerState::CHECKPOINTED
705 && newState == WorkerState::NAME_SERVICE_DATA_REGISTERED ){
706 JNOTE ( "entertaining queries now" );
707 broadcastMessage ( DMT_DO_SEND_QUERIES );
708 }
709 if ( oldState == WorkerState::NAME_SERVICE_DATA_REGISTERED
710 && newState == WorkerState::DONE_QUERYING ){
711 JNOTE ( "refilling all nodes" );
712 broadcastMessage ( DMT_DO_REFILL );
713 }
714 if ( oldState == WorkerState::DONE_QUERYING
715 && newState == WorkerState::REFILLED )
716 /* then broadcastMessage(DMT_DO_RESUME) after the #endif, below */
717 #else
718 if ( oldState == WorkerState::DRAINED
719 && newState == WorkerState::CHECKPOINTED )
720 {
721 writeRestartScript();
722 if (exitAfterCkpt) {
723 JNOTE("Checkpoint Done. Killing all peers.");
724 broadcastMessage(DMT_KILL_PEER);
725 exitAfterCkpt = false;
726 } else {
727 JNOTE ( "refilling all nodes" );
728 broadcastMessage ( DMT_DO_REFILL );
729 }
730 }
731 if ( oldState == WorkerState::RESTARTING
732 && newState == WorkerState::CHECKPOINTED )
733 {
734 JTIMER_STOP ( restart );
735
736 JNOTE ( "refilling all nodes (after checkpoint)" );
737 broadcastMessage ( DMT_DO_REFILL );
738 }
739 if ( oldState == WorkerState::CHECKPOINTED
740 && newState == WorkerState::REFILLED )
741 #endif
742 {
743 JNOTE ( "restarting all nodes" );
744 broadcastMessage ( DMT_DO_RESUME );
745
746 JTIMER_STOP ( checkpoint );
747 isRestarting = false;
748
749 resetCkptTimer();
750
751 if (blockUntilDone) {
752 DmtcpMessage blockUntilDoneReply(DMT_USER_CMD_RESULT);
753 JNOTE ( "replying to dmtcp_command: we're done" );
754 // These were set in DmtcpCoordinator::onConnect in this file
755 jalib::JSocket remote ( blockUntilDoneRemote );
756 remote << blockUntilDoneReply;
757 remote.close();
758 blockUntilDone = false;
759 blockUntilDoneRemote = -1;
760 }
761 }
762 }
763
764 void DmtcpCoordinator::onData(CoordClient *client)
765 {
766 DmtcpMessage msg;
767 JASSERT(client != NULL);
768
769 client->sock() >> msg;
770 msg.assertValid();
771 char *extraData = 0;
772 if (msg.extraBytes > 0) {
773 extraData = new char[msg.extraBytes];
774 client->sock().readAll(extraData, msg.extraBytes);
775 }
776
777 switch ( msg.type )
778 {
779 case DMT_OK:
780 {
781 WorkerState oldState = client->state();
782 client->setState ( msg.state );
783 ComputationStatus s = getStatus();
784 WorkerState newState = s.minimumState;
785
786 JTRACE ("got DMT_OK message")
787 ( oldState )( msg.from )( msg.state )( newState );
788
789 updateMinimumState(oldState);
790 break;
791 }
792 case DMT_UNIQUE_CKPT_FILENAME:
793 uniqueCkptFilenames = true;
794 // Fall though
795 case DMT_CKPT_FILENAME:
796 {
797 JASSERT ( extraData!=0 )
798 .Text ( "extra data expected with DMT_CKPT_FILENAME message" );
799 string ckptFilename;
800 string hostname;
801 ckptFilename = extraData;
802 hostname = extraData + ckptFilename.length() + 1;
803
804 JTRACE ( "recording restart info" ) ( ckptFilename ) ( hostname );
805 _restartFilenames[hostname].push_back ( ckptFilename );
806 }
807 break;
808 case DMT_GET_CKPT_DIR:
809 {
810 DmtcpMessage reply(DMT_GET_CKPT_DIR_RESULT);
811 reply.extraBytes = ckptDir.length() + 1;
812 client->sock() << reply;
813 client->sock().writeAll(ckptDir.c_str(), reply.extraBytes);
814 }
815 break;
816 case DMT_UPDATE_CKPT_DIR:
817 {
818 JASSERT(extraData != 0)
819 .Text("extra data expected with DMT_UPDATE_CKPT_DIR message");
820 if (strcmp(ckptDir.c_str(), extraData) != 0) {
821 ckptDir = extraData;
822 JNOTE("Updated ckptDir") (ckptDir);
823 }
824 }
825 break;
826
827 #ifdef COORD_NAMESERVICE
828 case DMT_REGISTER_NAME_SERVICE_DATA:
829 {
830 JTRACE ("received REGISTER_NAME_SERVICE_DATA msg") (client->identity());
831 lookupService.registerData(msg, (const void*) extraData);
832 }
833 break;
834
835 case DMT_REGISTER_NAME_SERVICE_DATA_SYNC:
836 {
837 JTRACE ("received REGISTER_NAME_SERVICE_DATA_SYNC msg") (client->identity());
838 lookupService.registerData(msg, (const void*) extraData);
839 DmtcpMessage response(DMT_REGISTER_NAME_SERVICE_DATA_SYNC_RESPONSE);
840 JTRACE("Sending NS response to the client...");
841 client->sock() << response;
842 }
843 break;
844 case DMT_NAME_SERVICE_QUERY:
845 {
846 JTRACE ("received NAME_SERVICE_QUERY msg") (client->identity());
847 lookupService.respondToQuery(client->sock(), msg,
848 (const void*) extraData);
849 }
850 break;
851 #endif
852
853 case DMT_UPDATE_PROCESS_INFO_AFTER_FORK:
854 {
855 JNOTE("Updating process Information after fork()")
856 (client->hostname()) (client->progname()) (msg.from) (client->identity());
857 client->identity(msg.from);
858 client->realPid(msg.realPid);
859 }
860 break;
861 case DMT_UPDATE_PROCESS_INFO_AFTER_INIT_OR_EXEC:
862 {
863 string progname = extraData;
864 JNOTE("Updating process Information after exec()")
865 (progname) (msg.from) (client->identity());
866 client->progname(progname);
867 client->identity(msg.from);
868 }
869 break;
870
871 case DMT_NULL:
872 JWARNING(false) (msg.type) .Text("unexpected message from worker. Closing connection");
873 onDisconnect(client);
874 break;
875 default:
876 JASSERT ( false ) ( msg.from ) ( msg.type )
877 .Text ( "unexpected message from worker" );
878 }
879
880 delete[] extraData;
881 }
882
883
884 static void removeStaleSharedAreaFile()
885 {
886 ostringstream o;
887 o << tmpDir
888 << "/dmtcpSharedArea." << compId << "." << std::hex << curTimeStamp;
889 JTRACE("Removing sharedArea file.") (o.str());
890 unlink(o.str().c_str());
891 }
892
893 static void preExitCleanup()
894 {
895 removeStaleSharedAreaFile();
896 JTRACE("Removing port-file") (thePortFile);
897 unlink(thePortFile.c_str());
898 }
899
900 void DmtcpCoordinator::onDisconnect(CoordClient *client)
901 {
902 if (client->isNSWorker()) {
903 client->sock().close();
904 delete client;
905 return;
906 }
907 for (size_t i = 0; i < clients.size(); i++) {
908 if (clients[i] == client) {
909 clients.erase(clients.begin() + i);
910 break;
911 }
912 }
913 client->sock().close();
914 JNOTE ( "client disconnected" ) ( client->identity() );
915 _virtualPidToClientMap.erase(client->virtualPid());
916
917 ComputationStatus s = getStatus();
918 if (s.numPeers < 1) {
919 if (exitOnLast) {
920 JNOTE ("last client exited, shutting down..");
921 handleUserCommand('q');
922 } else {
923 removeStaleSharedAreaFile();
924 }
925 // If a kill in is progress, the coordinator refuses any new connections,
926 // thus we need to reset it to false once all the processes in the
927 // computations have disconnected.
928 killInProgress = false;
929 if (theCheckpointInterval != theDefaultCheckpointInterval) {
930 updateCheckpointInterval(theDefaultCheckpointInterval);
931 JNOTE ( "CheckpointInterval reset on end of current computation" )
932 ( theCheckpointInterval );
933 }
934 } else {
935 updateMinimumState(client->state());
936 }
937 }
938
939 void DmtcpCoordinator::initializeComputation()
940 {
941 //this is the first connection, do some initializations
942 workersRunningAndSuspendMsgSent = false;
943 killInProgress = false;
944 //_nextVirtualPid = INITIAL_VIRTUAL_PID;
945
946 // theCheckpointInterval can be overridden later by msg from this client.
947 updateCheckpointInterval( theDefaultCheckpointInterval );
948
949 // drop current computation group to 0
950 compId = UniquePid(0,0,0);
951 curTimeStamp = 0; // Drop timestamp to 0
952 numPeers = -1; // Drop number of peers to unknown
953 blockUntilDone = false;
954 //exitAfterCkpt = false;
955 }
956
957 void DmtcpCoordinator::onConnect()
958 {
959 struct sockaddr_storage remoteAddr;
960 socklen_t remoteLen = sizeof(remoteAddr);
961 jalib::JSocket remote = listenSock->accept(&remoteAddr, &remoteLen);
962 JTRACE("accepting new connection") (remote.sockfd()) (JASSERT_ERRNO);
963
964 if (!remote.isValid()) {
965 remote.close();
966 return;
967 }
968
969 DmtcpMessage hello_remote;
970 hello_remote.poison();
971 JTRACE("Reading from incoming connection...");
972 remote >> hello_remote;
973 if (!remote.isValid()) {
974 remote.close();
975 return;
976 }
977
978 #ifdef COORD_NAMESERVICE
979 if (hello_remote.type == DMT_NAME_SERVICE_WORKER) {
980 CoordClient *client = new CoordClient(remote, &remoteAddr, remoteLen,
981 hello_remote);
982
983 addDataSocket(client);
984 return;
985 }
986 if (hello_remote.type == DMT_NAME_SERVICE_QUERY) {
987 JASSERT(hello_remote.extraBytes > 0) (hello_remote.extraBytes);
988 char *extraData = new char[hello_remote.extraBytes];
989 remote.readAll(extraData, hello_remote.extraBytes);
990
991 JTRACE ("received NAME_SERVICE_QUERY msg on running") (hello_remote.from);
992 lookupService.respondToQuery(remote, hello_remote, extraData);
993 delete [] extraData;
994 remote.close();
995 return;
996 }
997 if (hello_remote.type == DMT_REGISTER_NAME_SERVICE_DATA) {
998 JASSERT(hello_remote.extraBytes > 0) (hello_remote.extraBytes);
999 char *extraData = new char[hello_remote.extraBytes];
1000 remote.readAll(extraData, hello_remote.extraBytes);
1001
1002 JTRACE ("received REGISTER_NAME_SERVICE_DATA msg on running") (hello_remote.from);
1003 lookupService.registerData(hello_remote, (const void*) extraData);
1004 delete [] extraData;
1005 remote.close();
1006 return;
1007 }
1008 if (hello_remote.type == DMT_REGISTER_NAME_SERVICE_DATA_SYNC) {
1009 JASSERT(hello_remote.extraBytes > 0) (hello_remote.extraBytes);
1010 char *extraData = new char[hello_remote.extraBytes];
1011 remote.readAll(extraData, hello_remote.extraBytes);
1012
1013 JTRACE ("received REGISTER_NAME_SERVICE_DATA msg on running") (hello_remote.from);
1014 lookupService.registerData(hello_remote, (const void*) extraData);
1015 delete [] extraData;
1016 DmtcpMessage response(DMT_REGISTER_NAME_SERVICE_DATA_SYNC_RESPONSE);
1017 JTRACE("Reading from incoming connection...");
1018 remote << response;
1019 remote.close();
1020 return;
1021 }
1022 #endif
1023
1024 if (hello_remote.type == DMT_USER_CMD) {
1025 // TODO(kapil): Update ckpt interval only if a valid one was supplied to
1026 // dmtcp_command.
1027 updateCheckpointInterval(hello_remote.theCheckpointInterval);
1028 processDmtUserCmd(hello_remote, remote);
1029 return;
1030 }
1031
1032 if (killInProgress) {
1033 JNOTE("Connection request received in the middle of killing computation. "
1034 "Sending it the kill message.");
1035 DmtcpMessage msg;
1036 msg.type = DMT_KILL_PEER;
1037 remote << msg;
1038 remote.close();
1039 return;
1040 }
1041
1042 // If no client is connected to Coordinator, then there can be only zero data
1043 // sockets OR there can be one data socket and that should be STDIN.
1044 if (clients.size() == 0) {
1045 initializeComputation();
1046 }
1047
1048 CoordClient *client = new CoordClient(remote, &remoteAddr, remoteLen,
1049 hello_remote);
1050
1051 if( hello_remote.extraBytes > 0 ){
1052 client->readProcessInfo(hello_remote);
1053 }
1054
1055 if (hello_remote.type == DMT_RESTART_WORKER) {
1056 if (!validateRestartingWorkerProcess(hello_remote, remote,
1057 &remoteAddr, remoteLen)) {
1058 return;
1059 }
1060 client->virtualPid(hello_remote.from.pid());
1061 _virtualPidToClientMap[client->virtualPid()] = client;
1062 isRestarting = true;
1063 } else if (hello_remote.type == DMT_NEW_WORKER) {
1064 JASSERT(hello_remote.state == WorkerState::RUNNING ||
1065 hello_remote.state == WorkerState::UNKNOWN);
1066 JASSERT(hello_remote.virtualPid == -1);
1067 client->virtualPid(getNewVirtualPid());
1068 if (!validateNewWorkerProcess(hello_remote, remote, client,
1069 &remoteAddr, remoteLen)) {
1070 return;
1071 }
1072 _virtualPidToClientMap[client->virtualPid()] = client;
1073 } else {
1074 JASSERT(false) (hello_remote.type)
1075 .Text("Connect request from Unknown Remote Process Type");
1076 }
1077
1078 updateCheckpointInterval(hello_remote.theCheckpointInterval);
1079 JNOTE ( "worker connected" ) ( hello_remote.from );
1080
1081 clients.push_back(client);
1082 addDataSocket(client);
1083
1084 JTRACE("END") (clients.size());
1085 }
1086
1087 void DmtcpCoordinator::processDmtUserCmd(DmtcpMessage& hello_remote,
1088 jalib::JSocket& remote )
1089 {
1090 //dmtcp_command doesn't handshake (it is antisocial)
1091 JTRACE("got user command from dmtcp_command")(hello_remote.coordCmd);
1092 DmtcpMessage reply;
1093 reply.type = DMT_USER_CMD_RESULT;
1094 // if previous 'b' blocking prefix command had set blockUntilDone
1095 if (blockUntilDone && blockUntilDoneRemote == -1 &&
1096 hello_remote.coordCmd == 'c') {
1097 // Reply will be done in DmtcpCoordinator::onData in this file.
1098 blockUntilDoneRemote = remote.sockfd();
1099 handleUserCommand( hello_remote.coordCmd, &reply );
1100 } else if ( (hello_remote.coordCmd == 'i')
1101 && hello_remote.theCheckpointInterval >= 0 ) {
1102 // theDefaultCheckpointInterval = hello_remote.theCheckpointInterval;
1103 // theCheckpointInterval = theDefaultCheckpointInterval;
1104 handleUserCommand( hello_remote.coordCmd, &reply );
1105 remote << reply;
1106 remote.close();
1107 } else {
1108 handleUserCommand( hello_remote.coordCmd, &reply );
1109 remote << reply;
1110 remote.close();
1111 }
1112 return;
1113 }
1114
1115 bool DmtcpCoordinator::validateRestartingWorkerProcess(
1116 DmtcpMessage& hello_remote,
1117 jalib::JSocket& remote,
1118 const struct sockaddr_storage* remoteAddr,
1119 socklen_t remoteLen)
1120 {
1121 struct timeval tv;
1122 const struct sockaddr_in *sin = (const struct sockaddr_in*) remoteAddr;
1123 string remoteIP = inet_ntoa(sin->sin_addr);
1124 DmtcpMessage hello_local ( DMT_ACCEPT );
1125
1126 JASSERT(hello_remote.state == WorkerState::RESTARTING) (hello_remote.state);
1127
1128 if (compId == UniquePid(0,0,0)) {
1129 JASSERT ( minimumState() == WorkerState::UNKNOWN )
1130 .Text ( "Coordinator should be idle at this moment" );
1131 // Coordinator is free at this moment - set up all the things
1132 compId = hello_remote.compGroup;
1133 numPeers = hello_remote.numPeers;
1134 JASSERT(gettimeofday(&tv, NULL) == 0);
1135 // Get the resolution down to 100 mili seconds.
1136 curTimeStamp = (tv.tv_sec << 4) | (tv.tv_usec / (100*1000));
1137 JNOTE ( "FIRST dmtcp_restart connection. Set numPeers. Generate timestamp" )
1138 ( numPeers ) ( curTimeStamp ) ( compId );
1139 JTIMER_START(restart);
1140 } else if (minimumState() != WorkerState::RESTARTING &&
1141 minimumState() != WorkerState::CHECKPOINTED) {
1142 JNOTE ("Computation not in RESTARTING or CHECKPOINTED state."
1143 " Reject incoming computation process requesting restart.")
1144 (compId) (hello_remote.compGroup) (minimumState());
1145 hello_local.type = DMT_REJECT_NOT_RESTARTING;
1146 remote << hello_local;
1147 remote.close();
1148 return false;
1149 } else if ( hello_remote.compGroup != compId) {
1150 JNOTE ("Reject incoming computation process requesting restart,"
1151 " since it is not from current computation.")
1152 ( compId ) ( hello_remote.compGroup );
1153 hello_local.type = DMT_REJECT_WRONG_COMP;
1154 remote << hello_local;
1155 remote.close();
1156 return false;
1157 }
1158 // dmtcp_restart already connected and compGroup created.
1159 // Computation process connection
1160 JASSERT ( curTimeStamp != 0 );
1161
1162 JTRACE("Connection from (restarting) computation process")
1163 ( compId ) ( hello_remote.compGroup ) ( minimumState() );
1164
1165 hello_local.coordTimeStamp = curTimeStamp;
1166 if (Util::strStartsWith(remoteIP, "127.")) {
1167 memcpy(&hello_local.ipAddr, &localhostIPAddr, sizeof localhostIPAddr);
1168 } else {
1169 memcpy(&hello_local.ipAddr, &sin->sin_addr, sizeof localhostIPAddr);
1170 }
1171 remote << hello_local;
1172
1173 // NOTE: Sending the same message twice. We want to make sure that the
1174 // worker process receives/processes the first messages as soon as it
1175 // connects to the coordinator. The second message will be processed in
1176 // postRestart routine in DmtcpWorker.
1177 //
1178 // The reason to do this is the following. The dmtcp_restart process
1179 // connects to the coordinator at a very early stage. Later on, before
1180 // exec()'ing into mtcp_restart, it reconnects to the coordinator using
1181 // it's original UniquiePid and closes the earlier socket connection.
1182 // However, the coordinator might process the disconnect() before it
1183 // processes the connect() which would lead to a situation where the
1184 // coordinator is not connected to any worker processes. The coordinator
1185 // would now process the connect() and may reject the worker because the
1186 // worker state is RESTARTING, but the minimumState() is UNKNOWN.
1187 //remote << hello_local;
1188
1189 return true;
1190 }
1191
1192 bool DmtcpCoordinator::validateNewWorkerProcess(
1193 DmtcpMessage& hello_remote,
1194 jalib::JSocket& remote,
1195 CoordClient *client,
1196 const struct sockaddr_storage* remoteAddr,
1197 socklen_t remoteLen)
1198 {
1199 const struct sockaddr_in *sin = (const struct sockaddr_in*) remoteAddr;
1200 string remoteIP = inet_ntoa(sin->sin_addr);
1201 DmtcpMessage hello_local(DMT_ACCEPT);
1202 hello_local.virtualPid = client->virtualPid();
1203 ComputationStatus s = getStatus();
1204
1205 JASSERT(hello_remote.state == WorkerState::RUNNING ||
1206 hello_remote.state == WorkerState::UNKNOWN) (hello_remote.state);
1207
1208 if (workersRunningAndSuspendMsgSent == true) {
1209 /* Worker trying to connect after SUSPEND message has been sent.
1210 * This happens if the worker process is executing a fork() system call
1211 * when the DMT_DO_SUSPEND is broadcast. We need to make sure that the
1212 * child process is allowed to participate in the current checkpoint.
1213 */
1214 JASSERT(s.numPeers > 0) (s.numPeers);
1215 JASSERT(s.minimumState != WorkerState::SUSPENDED) (s.minimumState);
1216
1217 // Handshake
1218 hello_local.compGroup = compId;
1219 remote << hello_local;
1220
1221 // Now send DMT_DO_SUSPEND message so that this process can also
1222 // participate in the current checkpoint
1223 DmtcpMessage suspendMsg (DMT_DO_SUSPEND);
1224 suspendMsg.compGroup = compId;
1225 remote << suspendMsg;
1226
1227 } else if (s.numPeers > 0 && s.minimumState != WorkerState::RUNNING &&
1228 s.minimumState != WorkerState::UNKNOWN) {
1229 // If some of the processes are not in RUNNING state
1230 JNOTE("Current computation not in RUNNING state."
1231 " Refusing to accept new connections.")
1232 (compId) (hello_remote.from)
1233 (s.numPeers) (s.minimumState);
1234 hello_local.type = DMT_REJECT_NOT_RUNNING;
1235 remote << hello_local;
1236 remote.close();
1237 return false;
1238
1239 } else if (hello_remote.compGroup != UniquePid()) {
1240 // New Process trying to connect to Coordinator but already has compGroup
1241 JNOTE ("New process not part of currently running computation group"
1242 "on this coordinator. Rejecting.")
1243 (hello_remote.compGroup);
1244
1245 hello_local.type = DMT_REJECT_WRONG_COMP;
1246 remote << hello_local;
1247 remote.close();
1248 return false;
1249
1250 } else {
1251 // If first process, create the new computation group
1252 if (compId == UniquePid(0,0,0)) {
1253 struct timeval tv;
1254 // Connection of new computation.
1255 compId = UniquePid(hello_remote.from.hostid(), client->virtualPid(),
1256 hello_remote.from.time(),
1257 hello_remote.from.computationGeneration());
1258
1259 JASSERT(gettimeofday(&tv, NULL) == 0);
1260 // Get the resolution down to 100 mili seconds.
1261 curTimeStamp = (tv.tv_sec << 4) | (tv.tv_usec / (100*1000));
1262 numPeers = -1;
1263 JTRACE("First process connected. Creating new computation group.")
1264 (compId);
1265 } else {
1266 JTRACE("New process connected")
1267 (hello_remote.from) (client->virtualPid());
1268 }
1269 hello_local.compGroup = compId;
1270 hello_local.coordTimeStamp = curTimeStamp;
1271 if (Util::strStartsWith(remoteIP, "127.")) {
1272 memcpy(&hello_local.ipAddr, &localhostIPAddr, sizeof localhostIPAddr);
1273 } else {
1274 memcpy(&hello_local.ipAddr, &sin->sin_addr, sizeof localhostIPAddr);
1275 }
1276 remote << hello_local;
1277 }
1278 return true;
1279 }
1280
1281 bool DmtcpCoordinator::startCheckpoint()
1282 {
1283 uniqueCkptFilenames = false;
1284 ComputationStatus s = getStatus();
1285 if ( s.minimumState == WorkerState::RUNNING && s.minimumStateUnanimous
1286 && !workersRunningAndSuspendMsgSent )
1287 {
1288 time(&ckptTimeStamp);
1289 JTIMER_START ( checkpoint );
1290 _restartFilenames.clear();
1291 JNOTE ( "starting checkpoint, suspending all nodes" )( s.numPeers );
1292 compId.incrementGeneration();
1293 JNOTE("Incremented computationGeneration") (compId.computationGeneration());
1294 // Pass number of connected peers to all clients
1295 broadcastMessage(DMT_DO_SUSPEND);
1296
1297 // Suspend Message has been sent but the workers are still in running
1298 // state. If the coordinator receives another checkpoint request from user
1299 // at this point, it should fail.
1300 workersRunningAndSuspendMsgSent = true;
1301 return true;
1302 } else {
1303 if (s.numPeers > 0) {
1304 JTRACE ( "delaying checkpoint, workers not ready" ) ( s.minimumState )
1305 ( s.numPeers );
1306 }
1307 return false;
1308 }
1309 }
1310
1311 void DmtcpCoordinator::broadcastMessage(DmtcpMessageType type, int numPeers)
1312 {
1313 DmtcpMessage msg;
1314 msg.type = type;
1315 msg.compGroup = compId;
1316 if (numPeers > 0) {
1317 msg.numPeers = numPeers;
1318 }
1319
1320 if (msg.type == DMT_KILL_PEER && clients.size() > 0) {
1321 killInProgress = true;
1322 } else if (msg.type == DMT_DO_FD_LEADER_ELECTION) {
1323 // All the workers are in SUSPENDED state, now it is safe to reset
1324 // this flag.
1325 workersRunningAndSuspendMsgSent = false;
1326 }
1327
1328 for (size_t i = 0; i < clients.size(); i++) {
1329 clients[i]->sock() << msg;
1330 }
1331 JTRACE ("sending message")( type );
1332 }
1333
1334 DmtcpCoordinator::ComputationStatus DmtcpCoordinator::getStatus() const
1335 {
1336 ComputationStatus status;
1337 const static int INITIAL_MIN = WorkerState::_MAX;
1338 const static int INITIAL_MAX = WorkerState::UNKNOWN;
1339 int min = INITIAL_MIN;
1340 int max = INITIAL_MAX;
1341 int count = 0;
1342 bool unanimous = true;
1343 for (size_t i = 0; i < clients.size(); i++) {
1344 int cliState = clients[i]->state().value();
1345 count++;
1346 unanimous = unanimous && (min==cliState || min==INITIAL_MIN);
1347 if ( cliState < min ) min = cliState;
1348 if ( cliState > max ) max = cliState;
1349 }
1350
1351 status.minimumState = ( min==INITIAL_MIN ? WorkerState::UNKNOWN
1352 : (WorkerState::eWorkerState)min );
1353 if( status.minimumState == WorkerState::CHECKPOINTED &&
1354 isRestarting && count < numPeers ){
1355 JTRACE("minimal state counted as CHECKPOINTED but not all processes"
1356 " are connected yet. So we wait.") ( numPeers ) ( count );
1357 status.minimumState = WorkerState::RESTARTING;
1358 }
1359 status.minimumStateUnanimous = unanimous;
1360
1361 status.maximumState = ( max==INITIAL_MAX ? WorkerState::UNKNOWN
1362 : (WorkerState::eWorkerState)max );
1363 status.numPeers = count;
1364 return status;
1365 }
1366
1367 void DmtcpCoordinator::writeRestartScript()
1368 {
1369 ostringstream o;
1370 string uniqueFilename;
1371
1372 o << string(ckptDir) << "/"
1373 << RESTART_SCRIPT_BASENAME << "_" << compId;
1374 if (uniqueCkptFilenames) {
1375 o << "_" << std::setw(5) << std::setfill('0') << compId.computationGeneration();
1376 }
1377 o << "." << RESTART_SCRIPT_EXT;
1378 uniqueFilename = o.str();
1379
1380 const bool isSingleHost = (_restartFilenames.size() == 1);
1381
1382 map< string, vector<string> >::const_iterator host;
1383 vector<string>::const_iterator file;
1384
1385 char hostname[80];
1386 char timestamp[80];
1387 gethostname ( hostname, 80 );
1388
1389 JTRACE ( "writing restart script" ) ( uniqueFilename );
1390
1391 FILE* fp = fopen ( uniqueFilename.c_str(),"w" );
1392 JASSERT ( fp!=0 )(JASSERT_ERRNO)( uniqueFilename )
1393 .Text ( "failed to open file" );
1394
1395 fprintf ( fp, "%s", theRestartScriptHeader );
1396 fprintf ( fp, "%s", theRestartScriptCheckLocal );
1397 fprintf ( fp, "%s", slurmHelperContactFunction );
1398 fprintf ( fp, "%s", theRestartScriptUsage );
1399
1400 ctime_r(&ckptTimeStamp, timestamp);
1401 // Remove the trailing '\n'
1402 timestamp[strlen(timestamp) - 1] = '\0';
1403 fprintf ( fp, "ckpt_timestamp=\"%s\"\n\n", timestamp );
1404
1405 fprintf ( fp, "coord_host=$" ENV_VAR_NAME_HOST "\n"
1406 "if test -z \"$" ENV_VAR_NAME_HOST "\"; then\n"
1407 " coord_host=%s\nfi\n\n"
1408 "coord_port=$" ENV_VAR_NAME_PORT "\n"
1409 "if test -z \"$" ENV_VAR_NAME_PORT "\"; then\n"
1410 " coord_port=%d\nfi\n\n"
1411 "checkpoint_interval=$" ENV_VAR_CKPT_INTR "\n"
1412 "if test -z \"$" ENV_VAR_CKPT_INTR "\"; then\n"
1413 " checkpoint_interval=%d\nfi\n"
1414 "export DMTCP_CHECKPOINT_INTERVAL=${checkpoint_interval}\n\n",
1415 hostname, thePort, theCheckpointInterval );
1416
1417 fprintf ( fp, "%s", theRestartScriptCmdlineArgHandler );
1418
1419 fprintf ( fp, "dmt_rstr_cmd=%s/" DMTCP_RESTART_CMD "\n"
1420 "which $dmt_rstr_cmd > /dev/null 2>&1"
1421 " || dmt_rstr_cmd=" DMTCP_RESTART_CMD "\n"
1422 "which $dmt_rstr_cmd > /dev/null 2>&1"
1423 " || echo \"$0: $dmt_rstr_cmd not found\"\n"
1424 "which $dmt_rstr_cmd > /dev/null 2>&1 || exit 1\n\n",
1425 jalib::Filesystem::GetProgramDir().c_str());
1426
1427 fprintf ( fp, "# Number of hosts in the computation = %zd\n"
1428 "# Number of processes in the computation = %d\n\n",
1429 _restartFilenames.size(), getStatus().numPeers );
1430
1431 if ( isSingleHost ) {
1432 JTRACE ( "Single HOST" );
1433
1434 host=_restartFilenames.begin();
1435 ostringstream o;
1436 for ( file=host->second.begin(); file!=host->second.end(); ++file ) {
1437 o << " " << *file;
1438 }
1439 fprintf ( fp, "given_ckpt_files=\"%s\"\n\n", o.str().c_str());
1440
1441 fprintf ( fp, "%s", theRestartScriptSingleHostProcessing );
1442 }
1443 else
1444 {
1445 fprintf ( fp, "%s",
1446 "# SYNTAX:\n"
1447 "# :: <HOST> :<MODE>: <CHECKPOINT_IMAGE> ...\n"
1448 "# Host names and filenames must not include \':\'\n"
1449 "# At most one fg (foreground) mode allowed; it must be last.\n"
1450 "# \'maybexterm\' and \'maybebg\' are set from <MODE>.\n");
1451
1452 fprintf ( fp, "%s", "worker_ckpts=\'" );
1453 for ( host=_restartFilenames.begin(); host!=_restartFilenames.end(); ++host ) {
1454 fprintf ( fp, "\n :: %s :bg:", host->first.c_str() );
1455 for ( file=host->second.begin(); file!=host->second.end(); ++file ) {
1456 fprintf ( fp," %s", file->c_str() );
1457 }
1458 }
1459 fprintf ( fp, "%s", "\n\'\n\n" );
1460
1461 fprintf( fp, "# Check for resource manager\n"
1462 "ibrun_path=$(which ibrun 2> /dev/null)\n"
1463 "if [ ! -n \"$ibrun_path\" ]; then\n"
1464 " discover_rm_path=$(which dmtcp_discover_rm)\n"
1465 " if [ -n \"$discover_rm_path\" ]; then\n"
1466 " eval $(dmtcp_discover_rm -t)\n"
1467 " srun_path=$(which srun 2> /dev/null)\n"
1468 " llaunch=`which dmtcp_rm_loclaunch`\n"
1469 " if [ $RES_MANAGER = \"SLURM\" ] && [ -n \"$srun_path\" ]; then\n"
1470 " eval $(dmtcp_discover_rm -n \"$worker_ckpts\")\n"
1471 " if [ -n \"$DMTCP_DISCOVER_RM_ERROR\" ]; then\n"
1472 " echo \"Restart error: $DMTCP_DISCOVER_RM_ERROR\"\n"
1473 " echo \"Allocated resources: $manager_resources\"\n"
1474 " exit 0\n"
1475 " fi\n"
1476 " export DMTCP_REMLAUNCH_NODES=$DMTCP_REMLAUNCH_NODES\n"
1477 " bound=$(($DMTCP_REMLAUNCH_NODES - 1))\n"
1478 " for i in $(seq 0 $bound); do\n"
1479 " eval \"val=\\${DMTCP_REMLAUNCH_${i}_SLOTS}\"\n"
1480 " export DMTCP_REMLAUNCH_${i}_SLOTS=\"$val\"\n"
1481 " bound2=$(($val - 1))\n"
1482 " for j in $(seq 0 $bound2); do\n"
1483 " eval \"ckpts=\\${DMTCP_REMLAUNCH_${i}_${j}}\"\n"
1484 " export DMTCP_REMLAUNCH_${i}_${j}=\"$ckpts\"\n"
1485 " done\n"
1486 " done\n"
1487 " if [ \"$DMTCP_DISCOVER_PM_TYPE\" = \"HYDRA\" ]; then\n"
1488 " export DMTCP_SRUN_HELPER_SYNCFILE=`mktemp ./tmp.XXXXXXXXXX`\n"
1489 " rm $DMTCP_SRUN_HELPER_SYNCFILE\n"
1490 " dmtcp_srun_helper -r $srun_path \"$llaunch\"\n"
1491 " if [ ! -f $DMTCP_SRUN_HELPER_SYNCFILE ]; then\n"
1492 " echo \"Error launching application\"\n"
1493 " exit 1\n"
1494 " fi\n"
1495 " # export helper contact info\n"
1496 " . $DMTCP_SRUN_HELPER_SYNCFILE\n"
1497 " pass_slurm_helper_contact \"$DMTCP_LAUNCH_CKPTS\"\n"
1498 " rm $DMTCP_SRUN_HELPER_SYNCFILE\n"
1499 " dmtcp_restart --join --coord-host $DMTCP_COORD_HOST"
1500 " --coord-port $DMTCP_COORD_PORT"
1501 " $DMTCP_LAUNCH_CKPTS\n"
1502 " else\n"
1503 " DMTCP_REMLAUNCH_0_0=\"$DMTCP_REMLAUNCH_0_0"
1504 " $DMTCP_LAUNCH_CKPTS\"\n"
1505 " $srun_path \"$llaunch\"\n"
1506 " fi\n"
1507 " exit 0\n"
1508 " elif [ $RES_MANAGER = \"TORQUE\" ]; then\n"
1509 " #eval $(dmtcp_discover_rm \"$worker_ckpts\")\n"
1510 " #if [ -n \"$new_worker_ckpts\" ]; then\n"
1511 " # worker_ckpts=\"$new_worker_ckpts\"\n"
1512 " #fi\n"
1513 " eval $(dmtcp_discover_rm -n \"$worker_ckpts\")\n"
1514 " if [ -n \"$DMTCP_DISCOVER_RM_ERROR\" ]; then\n"
1515 " echo \"Restart error: $DMTCP_DISCOVER_RM_ERROR\"\n"
1516 " echo \"Allocated resources: $manager_resources\"\n"
1517 " exit 0\n"
1518 " fi\n"
1519 " arguments=\"PATH=$PATH DMTCP_COORD_HOST=$DMTCP_COORD_HOST"
1520 " DMTCP_COORD_PORT=$DMTCP_COORD_PORT\"\n"
1521 " arguments=$arguments\" DMTCP_CHECKPOINT_INTERVAL=$DMTCP_CHECKPOINT_INTERVAL\"\n"
1522 " arguments=$arguments\" DMTCP_TMPDIR=$DMTCP_TMPDIR\"\n"
1523 " arguments=$arguments\" DMTCP_REMLAUNCH_NODES=$DMTCP_REMLAUNCH_NODES\"\n"
1524 " bound=$(($DMTCP_REMLAUNCH_NODES - 1))\n"
1525 " for i in $(seq 0 $bound); do\n"
1526 " eval \"val=\\${DMTCP_REMLAUNCH_${i}_SLOTS}\"\n"
1527 " arguments=$arguments\" DMTCP_REMLAUNCH_${i}_SLOTS=\\\"$val\\\"\"\n"
1528 " bound2=$(($val - 1))\n"
1529 " for j in $(seq 0 $bound2); do\n"
1530 " eval \"ckpts=\\${DMTCP_REMLAUNCH_${i}_${j}}\"\n"
1531 " arguments=$arguments\" DMTCP_REMLAUNCH_${i}_${j}=\\\"$ckpts\\\"\"\n"
1532 " done\n"
1533 " done\n"
1534 " pbsdsh -u \"$llaunch\" \"$arguments\"\n"
1535 " exit 0\n"
1536 " fi\n"
1537 " fi\n"
1538 "fi\n"
1539 "\n\n"
1540 );
1541
1542 fprintf ( fp, "%s", theRestartScriptMultiHostProcessing );
1543 }
1544
1545 fclose ( fp );
1546 {
1547 string filename = RESTART_SCRIPT_BASENAME "." RESTART_SCRIPT_EXT;
1548 string dirname = jalib::Filesystem::DirName(uniqueFilename);
1549 int dirfd = open(dirname.c_str(), O_DIRECTORY | O_RDONLY);
1550 JASSERT(dirfd != -1) (dirname) (JASSERT_ERRNO);
1551
1552 /* Set execute permission for user. */
1553 struct stat buf;
1554 JASSERT(stat(uniqueFilename.c_str(), &buf) == 0);
1555 JASSERT(chmod(uniqueFilename.c_str(), buf.st_mode | S_IXUSR) == 0);
1556 // Create a symlink from
1557 // dmtcp_restart_script.sh -> dmtcp_restart_script_<curCompId>.sh
1558 unlink(filename.c_str());
1559 JTRACE("linking \"dmtcp_restart_script.sh\" filename to uniqueFilename")
1560 (filename) (dirname) (uniqueFilename);
1561 // FIXME: Handle error case of symlink()
1562 JWARNING(symlinkat(uniqueFilename.c_str(), dirfd, filename.c_str()) == 0);
1563 JASSERT(close(dirfd) == 0);
1564 }
1565 _restartFilenames.clear();
1566 }
1567
1568 static void signalHandler(int signum)
1569 {
1570 if (signum == SIGINT) {
1571 prog.handleUserCommand('q');
1572 } else if (signum == SIGALRM) {
1573 timerExpired = true;
1574 } else {
1575 JASSERT(false) .Text("Not reached");
1576 }
1577 }
1578
1579 static void setupSignalHandlers()
1580 {
1581 struct sigaction action;
1582 sigemptyset(&action.sa_mask);
1583 action.sa_flags = 0;
1584 action.sa_handler = signalHandler;
1585
1586 sigaction(SIGINT, &action, NULL);
1587 sigaction(SIGALRM, &action, NULL);
1588 }
1589
1590 // This code is also copied to ssh.cpp:updateCoordHost()
1591 static void calcLocalAddr()
1592 {
1593 string cmd;
1594 char hostname[HOST_NAME_MAX];
1595 JASSERT(gethostname(hostname, sizeof hostname) == 0) (JASSERT_ERRNO);
1596 struct addrinfo *result;
1597 struct addrinfo *res;
1598 int error;
1599 struct addrinfo hints;
1600
1601 memset(&localhostIPAddr, 0, sizeof localhostIPAddr);
1602 memset(&hints, 0, sizeof(struct addrinfo));
1603 hints.ai_family = AF_INET;
1604 hints.ai_socktype = SOCK_STREAM;
1605 hints.ai_flags = AI_PASSIVE;
1606 hints.ai_protocol = 0;
1607 hints.ai_canonname = NULL;
1608 hints.ai_addr = NULL;
1609 hints.ai_next = NULL;
1610
1611 /* resolve the domain name into a list of addresses */
1612 error = getaddrinfo(hostname, NULL, &hints, &result);
1613 if (error == 0) {
1614 /* loop over all returned results and do inverse lookup */
1615 bool success = false;
1616 for (res = result; res != NULL; res = res->ai_next) {
1617 char name[NI_MAXHOST] = "";
1618 struct sockaddr_in *s = (struct sockaddr_in*) res->ai_addr;
1619
1620 error = getnameinfo(res->ai_addr, res->ai_addrlen, name, NI_MAXHOST, NULL, 0, 0);
1621 if (error != 0) {
1622 JTRACE("getnameinfo() failed.") (gai_strerror(error));
1623 continue;
1624 }
1625 if (Util::strStartsWith(name, hostname) ||
1626 Util::strStartsWith(hostname, name)) {
1627 JASSERT(sizeof localhostIPAddr == sizeof s->sin_addr);
1628 success = true;
1629 memcpy(&localhostIPAddr, &s->sin_addr, sizeof s->sin_addr);
1630 }
1631 }
1632 if (!success) {
1633 JWARNING("Failed to find coordinator IP address. DMTCP may fail.") (hostname) ;
1634 }
1635 } else {
1636 if (error == EAI_SYSTEM) {
1637 perror("getaddrinfo");
1638 } else {
1639 JTRACE("Error in getaddrinfo") (gai_strerror(error));
1640 }
1641 inet_aton("127.0.0.1", &localhostIPAddr);
1642 }
1643 coordHostname = hostname;
1644 }
1645
1646 static void resetCkptTimer()
1647 {
1648 alarm(theCheckpointInterval);
1649 }
1650
1651 void DmtcpCoordinator::updateCheckpointInterval(uint32_t interval)
1652 {
1653 if ( interval != DMTCPMESSAGE_SAME_CKPT_INTERVAL &&
1654 interval != theCheckpointInterval) {
1655 int oldInterval = theCheckpointInterval;
1656 theCheckpointInterval = interval;
1657 JNOTE ( "CheckpointInterval updated (for this computation only)" )
1658 ( oldInterval ) ( theCheckpointInterval );
1659 resetCkptTimer();
1660 }
1661 }
1662
1663 void DmtcpCoordinator::eventLoop(bool daemon)
1664 {
1665 struct epoll_event ev;
1666 epollFd = epoll_create(MAX_EVENTS);
1667 JASSERT(epollFd != -1) (JASSERT_ERRNO);
1668
1669 ev.events = EPOLLIN;
1670 ev.data.ptr = listenSock;
1671 JASSERT(epoll_ctl(epollFd, EPOLL_CTL_ADD, listenSock->sockfd(), &ev) != -1)
1672 (JASSERT_ERRNO);
1673
1674 if (!daemon &&
1675 // epoll_ctl below fails if STDIN is pointing to /dev/null.
1676 // Not sure why.
1677 jalib::Filesystem::GetDeviceName(0) != "/dev/null" &&
1678 jalib::Filesystem::GetDeviceName(0) != "/dev/zero" &&
1679 jalib::Filesystem::GetDeviceName(0) != "/dev/random") {
1680 ev.events = EPOLLIN;
1681 #ifdef EPOLLRDHUP
1682 ev.events |= EPOLLRDHUP;
1683 #endif
1684 ev.data.ptr = (void*) STDIN_FILENO;
1685 JASSERT(epoll_ctl(epollFd, EPOLL_CTL_ADD, STDIN_FILENO, &ev) != -1)
1686 (JASSERT_ERRNO);
1687 }
1688
1689 while (true) {
1690 // Wait until either there is some activity on client sockets, or the timer
1691 // has expired.
1692 int nfds = epoll_wait(epollFd, events, MAX_EVENTS, -1);
1693
1694 // The ckpt timer has expired; it's time to checkpoint.
1695 if (nfds == -1 && errno == EINTR && timerExpired) {
1696 timerExpired = false;
1697 startCheckpoint();
1698 continue;
1699 }
1700
1701 JASSERT(nfds != -1) (JASSERT_ERRNO);
1702
1703 for (int n = 0; n < nfds; ++n) {
1704 void *ptr = events[n].data.ptr;
1705 if ((events[n].events & EPOLLHUP) ||
1706 #ifdef EPOLLRDHUP
1707 (events[n].events & EPOLLRDHUP) ||
1708 #endif
1709 (events[n].events & EPOLLERR)) {
1710 JASSERT(ptr != listenSock);
1711 if (ptr == (void*) STDIN_FILENO) {
1712 JASSERT(epoll_ctl(epollFd, EPOLL_CTL_DEL, STDIN_FILENO, &ev) != -1)
1713 (JASSERT_ERRNO);
1714 close(STDIN_FD);
1715 } else {
1716 onDisconnect((CoordClient*)ptr);
1717 }
1718 } else if (events[n].events & EPOLLIN) {
1719 if (ptr == (void*) listenSock) {
1720 onConnect();
1721 } else if (ptr == (void*) STDIN_FILENO) {
1722 char buf[1];
1723 int ret = Util::readAll(STDIN_FD, buf, sizeof(buf));
1724 JASSERT(ret != -1) (JASSERT_ERRNO);
1725 if (ret > 0) {
1726 handleUserCommand(buf[0]);
1727 } else {
1728 JNOTE("closing stdin");
1729 JASSERT(epoll_ctl(epollFd, EPOLL_CTL_DEL, STDIN_FILENO, &ev) != -1)
1730 (JASSERT_ERRNO);
1731 close(STDIN_FD);
1732 }
1733 } else {
1734 onData((CoordClient*)ptr);
1735 }
1736 }
1737 }
1738 }
1739 }
1740
1741 void DmtcpCoordinator::addDataSocket(CoordClient *client)
1742 {
1743 struct epoll_event ev;
1744
1745 #ifdef EPOLLRDHUP
1746 ev.events = EPOLLIN | EPOLLRDHUP;
1747 #else
1748 ev.events = EPOLLIN;
1749 #endif
1750 ev.data.ptr = client;
1751 JASSERT(epoll_ctl(epollFd, EPOLL_CTL_ADD, client->sock().sockfd(), &ev) != -1)
1752 (JASSERT_ERRNO);
1753 }
1754
1755 #define shift argc--; argv++
1756
1757 int main ( int argc, char** argv )
1758 {
1759 initializeJalib();
1760
1761 //parse port
1762 thePort = DEFAULT_PORT;
1763 const char* portStr = getenv( ENV_VAR_NAME_PORT );
1764 if ( portStr == NULL ) portStr = getenv("DMTCP_PORT"); // deprecated
1765 if ( portStr != NULL ) thePort = jalib::StringToInt( portStr );
1766
1767 bool daemon = false;
1768
1769 char * tmpdir_arg = NULL;
1770
1771 shift;
1772 while(argc > 0){
1773 string s = argv[0];
1774 if(s=="-h" || s=="--help"){
1775 printf("%s", theUsage);
1776 return 1;
1777 } else if ((s=="--version") && argc==1){
1778 printf("%s", DMTCP_VERSION_AND_COPYRIGHT_INFO);
1779 return 1;
1780 }else if(s == "-q" || s == "--quiet"){
1781 // TODO(kapil): Ignored for now. Remove in later versions.
1782 shift;
1783 }else if(s=="--exit-on-last"){
1784 exitOnLast = true;
1785 shift;
1786 }else if(s=="--exit-after-ckpt"){
1787 exitAfterCkpt = true;
1788 shift;
1789 }else if(s=="--daemon"){
1790 daemon = true;
1791 shift;
1792 } else if (s == "-i" || s == "--interval") {
1793 setenv(ENV_VAR_CKPT_INTR, argv[1], 1);
1794 shift; shift;
1795 } else if (argv[0][0] == '-' && argv[0][1] == 'i' &&
1796 isdigit(argv[0][2])) { // else if -i5, for example
1797 setenv(ENV_VAR_CKPT_INTR, argv[0]+2, 1);
1798 shift;
1799 } else if (argc>1 && (s == "-p" || s == "--port" || s == "--coord-port")) {
1800 thePort = jalib::StringToInt( argv[1] );
1801 shift; shift;
1802 } else if (argv[0][0] == '-' && argv[0][1] == 'p' &&
1803 isdigit(argv[0][2])) { // else if -p0, for example
1804 thePort = jalib::StringToInt( argv[0]+2 );
1805 shift;
1806 }else if(argc>1 && s == "--port-file"){
1807 thePortFile = argv[1];
1808 shift; shift;
1809 }else if(argc>1 && (s == "-c" || s == "--ckptdir")){
1810 setenv(ENV_VAR_CHECKPOINT_DIR, argv[1], 1);
1811 shift; shift;
1812 }else if(argc>1 && (s == "-t" || s == "--tmpdir")){
1813 tmpdir_arg = argv[1];
1814 shift; shift;
1815 }else if(argc == 1){ //last arg can be port
1816 char *endptr;
1817 long x = strtol(argv[0], &endptr, 10);
1818 if ((ssize_t)strlen(argv[0]) != endptr - argv[0]) {
1819 fprintf(stderr, theUsage, DEFAULT_PORT);
1820 return 1;
1821 } else {
1822 thePort = jalib::StringToInt( argv[0] );
1823 shift;
1824 }
1825 x++, x--; // to suppress unused variable warning
1826 }else{
1827 fprintf(stderr, theUsage, DEFAULT_PORT);
1828 return 1;
1829 }
1830 }
1831
1832 tmpDir = Util::calcTmpDir(tmpdir_arg);
1833 Util::initializeLogFile(tmpDir);
1834
1835 JTRACE ( "New DMTCP coordinator starting." )
1836 ( UniquePid::ThisProcess() );
1837
1838 if ( thePort < 0 )
1839 {
1840 fprintf(stderr, theUsage, DEFAULT_PORT);
1841 return 1;
1842 }
1843
1844 calcLocalAddr();
1845
1846 if (getenv(ENV_VAR_CHECKPOINT_DIR) != NULL) {
1847 ckptDir = getenv(ENV_VAR_CHECKPOINT_DIR);
1848 } else {
1849 ckptDir = get_current_dir_name();
1850 }
1851
1852 /*Test if the listener socket is already open*/
1853 if ( fcntl(PROTECTED_COORD_FD, F_GETFD) != -1 ) {
1854 listenSock = new jalib::JServerSocket ( PROTECTED_COORD_FD );
1855 JASSERT ( listenSock->port() != -1 ) .Text ( "Invalid listener socket" );
1856 JTRACE ( "Using already created listener socker" ) ( listenSock->port() );
1857 } else {
1858
1859 errno = 0;
1860 listenSock = new jalib::JServerSocket(jalib::JSockAddr::ANY, thePort, 128);
1861 JASSERT ( listenSock->isValid() ) ( thePort ) ( JASSERT_ERRNO )
1862 .Text ( "Failed to create listen socket."
1863 "\nIf msg is \"Address already in use\", this may be an old coordinator."
1864 "\nKill default coordinator and try again: dmtcp_command -q"
1865 "\nIf that fails, \"pkill -9 dmtcp_coord\","
1866 " and try again in a minute or so." );
1867 }
1868
1869 thePort = listenSock->port();
1870 if (!thePortFile.empty()) {
1871 Util::writeCoordPortToFile(thePort, thePortFile.c_str());
1872 }
1873
1874 //parse checkpoint interval
1875 const char* interval = getenv ( ENV_VAR_CKPT_INTR );
1876 if ( interval != NULL ) {
1877 theDefaultCheckpointInterval = jalib::StringToInt ( interval );
1878 theCheckpointInterval = theDefaultCheckpointInterval;
1879 }
1880
1881 #if 0
1882 JASSERT_STDERR <<
1883 "dmtcp_coordinator starting..." <<
1884 "\n Port: " << thePort <<
1885 "\n Checkpoint Interval: ";
1886 if(theCheckpointInterval==0)
1887 JASSERT_STDERR << "disabled (checkpoint manually instead)";
1888 else
1889 JASSERT_STDERR << theCheckpointInterval;
1890 JASSERT_STDERR <<
1891 "\n Exit on last client: " << exitOnLast << "\n";
1892 #else
1893
1894 fprintf(stderr, "dmtcp_coordinator starting..."
1895 "\n Host: %s (%s)"
1896 "\n Port: %d"
1897 "\n Checkpoint Interval: ",
1898 coordHostname.c_str(), inet_ntoa(localhostIPAddr), thePort);
1899 if(theCheckpointInterval==0)
1900 fprintf(stderr, "disabled (checkpoint manually instead)");
1901 else
1902 fprintf(stderr, "%d", theCheckpointInterval);
1903 fprintf(stderr, "\n Exit on last client: %d\n", exitOnLast);
1904 #endif
1905
1906 if (daemon) {
1907 JASSERT_STDERR << "Backgrounding...\n";
1908 int fd = open("/dev/null", O_RDWR);
1909 JASSERT(dup2(fd, STDIN_FILENO) == STDIN_FILENO);
1910 JASSERT(dup2(fd, STDOUT_FILENO) == STDOUT_FILENO);
1911 JASSERT(dup2(fd, STDERR_FILENO) == STDERR_FILENO);
1912 JASSERT_CLOSE_STDERR();
1913 if (fd > STDERR_FILENO) {
1914 close(fd);
1915 }
1916
1917 if (fork() > 0) {
1918 JTRACE ( "Parent Exiting after fork()" );
1919 exit(0);
1920 }
1921 //pid_t sid = setsid();
1922 } else {
1923 JASSERT_STDERR <<
1924 "Type '?' for help." <<
1925 "\n\n";
1926 }
1927
1928 /* We set up the signal handler for SIGINT and SIGALRM.
1929 * SIGINT is used to send DMT_KILL_PEER message to all the connected peers
1930 * before exiting.
1931 * SIGALRM is used for interval checkpointing.
1932 */
1933 setupSignalHandlers();
1934
1935 /* If the coordinator was started transparently by dmtcp_launch, then we
1936 * want to block signals, such as SIGINT. To see why this is important:
1937 * % gdb dmtcp_launch a.out
1938 * (gdb) run
1939 * ^C # Stop gdb to get its attention, and continue debugging.
1940 * # The above scenario causes the SIGINT to go to a.out and its child,
1941 * # the dmtcp_coordinator. The coord then triggers the SIGINT handler,
1942 * # which sends DMT_KILL_PEER to kill a.out.
1943 */
1944 if ( exitOnLast && daemon ) {
1945 sigset_t set;
1946 sigfillset(&set);
1947 // sigprocmask is only per-thread; but the coordinator is single-threaded.
1948 sigprocmask(SIG_BLOCK, &set, NULL);
1949 }
1950
1951 prog.eventLoop(daemon);
1952 return 0;
1953 }