Commit 568049d0 authored by Lucas Russo's avatar Lucas Russo

src/sm_io/sm_io_bootstrap.c: fix creation/destruction of CZMQ actors/pipes

Our actors/pipes were not terminating properly due to asynchronous
behaviour of actors/pipes. Now, a proper handshake accurs between
DEVIO and actor/pipes SMIOs and they terminate gracefully.
parent 5fea2e3a
......@@ -155,7 +155,16 @@ void smio_config_defaults (zsock_t *pipe, void *args)
smio_service, th_args->log_file);
/* We've finished configuring the SMIO. Tell DEVIO we are done */
zsock_signal (pipe, 0);
char *smio_service_suffix = hutils_concat_strings_no_sep (
smio_mod_dispatch[th_args->smio_id].name, inst_id_str);
ASSERT_ALLOC(smio_service_suffix, err_smio_service_suffix_alloc);
DBE_DEBUG (DBG_SM_IO | DBG_LVL_INFO, "[sm_io_bootstrap] Sending CONFIG DONE message over PIPE\n");
int zerr = zstr_sendx (pipe, smio_service_suffix, "CONFIG DONE", NULL);
ASSERT_TEST (zerr >= 0, "Config thread could not send CONFIG DONE message "
"over PIPE. Destroying ourselves", err_send_config_done);
DBE_DEBUG (DBG_SM_IO | DBG_LVL_INFO, "[sm_io_bootstrap] Config Thread %s "
"sent CONFIG DONE over PIPE\n", smio_service);
/* Wait for $TERM message from DEVIO to end */
bool terminated = false;
......@@ -169,7 +178,14 @@ void smio_config_defaults (zsock_t *pipe, void *args)
if (streq (command, "$TERM")) {
terminated = true;
}
/* Invalid message received. Log the error, but continue normally */
else {
DBE_DEBUG (DBG_SM_IO | DBG_LVL_WARN, "[sm_io_bootstrap] Config Thread %s "
"received an invalid command over PIPE\n", smio_service);
goto err_pipe_mgmt_bad_msg;
}
err_pipe_mgmt_bad_msg:
free (command);
zmsg_destroy (&msg);
}
......@@ -177,6 +193,9 @@ void smio_config_defaults (zsock_t *pipe, void *args)
DBE_DEBUG (DBG_SM_IO | DBG_LVL_INFO, "[sm_io_bootstrap] Config Thread %s "
"terminating with %s\n", smio_service, (terminated)? "success" : "error");
err_smio_service_suffix_alloc:
free (smio_service_suffix);
err_send_config_done:
free (smio_service);
err_smio_service_alloc:
free (inst_id_str);
......@@ -276,6 +295,7 @@ static struct _smio_t *_smio_new (th_boot_args_t *args, zsock_t *pipe_mgmt,
err_mlm_connect:
mlm_client_destroy (&self->worker);
err_worker_alloc:
zsock_destroy (&self->pipe_msg);
disp_table_destroy (&self->exp_ops_dtable);
err_exp_ops_dtable_alloc:
free (self->service);
......@@ -293,6 +313,7 @@ static smio_err_e _smio_destroy (struct _smio_t **self_p)
struct _smio_t *self = *self_p;
mlm_client_destroy (&self->worker);
zsock_destroy (&self->pipe_msg);
disp_table_destroy (&self->exp_ops_dtable);
self->thsafe_client_ops = NULL;
self->ops = NULL;
......@@ -308,79 +329,34 @@ static smio_err_e _smio_destroy (struct _smio_t **self_p)
static smio_err_e _smio_loop (smio_t *self)
{
assert (self);
DBE_DEBUG (DBG_SM_IO | DBG_LVL_TRACE,
"[sm_io_bootstrap] Main loop starting\n");
smio_err_e err = SMIO_SUCCESS;
bool terminated = false;
void *pipe_mgmt_zmq_socket = NULL;
void *worker_zmq_socket = NULL;
pipe_mgmt_zmq_socket = zsock_resolve (self->pipe_mgmt);
ASSERT_TEST (pipe_mgmt_zmq_socket != NULL, "Invalid PIPE Management socket reference",
err_inv_pipe_mgmt_socket, SMIO_ERR_INV_SOCKET);
worker_zmq_socket = zactor_resolve (mlm_client_actor (self->worker));
ASSERT_TEST (worker_zmq_socket != NULL, "Invalid WORKER socket reference",
/* We send/recv messages through MLM msgpipe */
zsock_t *worker_msgpipe = mlm_client_msgpipe (self->worker);
ASSERT_TEST (worker_msgpipe != NULL, "Invalid WORKER socket reference",
err_inv_worker_socket, SMIO_ERR_INV_SOCKET);
zpoller_t *poller = zpoller_new (worker_msgpipe, self->pipe_mgmt, NULL);
ASSERT_TEST (poller != NULL, "Could not Initialize poller",
err_init_poller, SMIO_ERR_INV_SOCKET);
/* Begin infinite polling on Malamute/PIPE socket
* and exit if the parent send a message through
* the pipe socket */
while (!terminated) {
/* Listen to WORKER (requests from clients) and PIPE (managment) sockets */
zmq_pollitem_t items [] = {
[SMIO_PIPE_MGMT_SOCK] = {
.socket = pipe_mgmt_zmq_socket,
.fd = 0,
.events = ZMQ_POLLIN,
.revents = 0
},
[SMIO_MLM_SOCK] = {
.socket = worker_zmq_socket,
.fd = 0,
.events = ZMQ_POLLIN,
.revents = 0
}
};
/* Wait up to 100 ms */
int rc = zmq_poll (items, SMIO_SOCKS_NUM, SMIO_POLLER_TIMEOUT);
ASSERT_TEST(rc != -1, "Poller has been interrupted",
err_loop_interrupted, SMIO_ERR_INTERRUPTED_POLLER);
/* Check for activity on PIPE socket */
if (items [SMIO_PIPE_MGMT_SOCK].revents & ZMQ_POLLIN) {
/* On any activity we destroy ourselves */
zmsg_t *request = zmsg_recv (self->pipe_mgmt);
if (request == NULL) {
err = SMIO_ERR_INTERRUPTED_POLLER;
break; /* Worker has been interrupted */
}
char *command = zmsg_popstr (request);
/* A $TERM message on this means to self-destruct */
if (streq (command, "$TERM")) {
/* Destroy SMIO instance. As we already do this on the main
* smio_startup (), we just need to exit this cleanly */
DBE_DEBUG (DBG_SM_IO | DBG_LVL_WARN,
"[sm_io_bootstrap] Received shutdown message on "
"PIPE socket. Exiting.\n");
terminated = true;
}
else {
DBE_DEBUG (DBG_SM_IO | DBG_LVL_ERR,
"[sm_io_bootstrap] Invalid message received on PIPE socket.\n"
"This was probably supposed to go through another socket\n");
}
free (command);
zmsg_destroy (&request);
}
bool terminated = false;
while (!zsys_interrupted && !terminated) {
/* Poll Message sockets */
zsock_t *which = zpoller_wait (poller, SMIO_POLLER_TIMEOUT);
ASSERT_TEST(which != NULL || zpoller_expired (poller),
"_smio_loop: poller interrupted", err_poller_interrupted,
SMIO_ERR_INTERRUPTED_POLLER);
/* Check for activity on PIPE socket */
if (items [SMIO_MLM_SOCK].revents & ZMQ_POLLIN) {
/* Check for activity on WORKER socket */
if (which == worker_msgpipe) {
zmsg_t *request = mlm_client_recv (self->worker);
if (request == NULL) {
......@@ -405,12 +381,43 @@ static smio_err_e _smio_loop (smio_t *self)
/* Cleanup */
zmsg_destroy (&request);
}
/* Check for activity on PIPE socket */
else if (which == self->pipe_mgmt) {
zmsg_t *request = zmsg_recv (self->pipe_mgmt);
if (request == NULL) {
err = SMIO_ERR_INTERRUPTED_POLLER;
break; /* Worker has been interrupted */
}
char *command = zmsg_popstr (request);
/* A $TERM message on this means to self-destruct */
if (streq (command, "$TERM")) {
/* Destroy SMIO instance. As we already do this on the main
* smio_startup (), we just need to exit this cleanly */
DBE_DEBUG (DBG_SM_IO | DBG_LVL_WARN,
"[sm_io_bootstrap] Received shutdown message on "
"PIPE socket. Exiting.\n");
terminated = true;
}
/* Invalid message received. Log the error, but continue normally */
else {
DBE_DEBUG (DBG_SM_IO | DBG_LVL_WARN,
"[sm_io_bootstrap] Invalid message received on PIPE "
"socket.\n");
err = SMIO_ERR_BAD_MSG;
goto err_pipe_mgmt_bad_msg;
}
err_pipe_mgmt_bad_msg:
free (command);
zmsg_destroy (&request);
}
}
err_loop_interrupted:
err_poller_interrupted:
err_init_poller:
err_inv_worker_socket:
err_inv_pipe_mgmt_socket:
return err;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment