Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent hanging on waiting for timeout time #477

Merged
merged 5 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 65 additions & 38 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag
// federate that is far ahead of other upstream federates in logical time.
lf_update_max_level(_fed.last_TAG, _fed.is_last_TAG_provisional);
lf_cond_broadcast(&lf_port_status_changed);
lf_cond_broadcast(&env->event_q_changed);
} else {
// Message arrivals should be monotonic, so this should not occur.
lf_print_warning("Attempt to update the last known status tag "
Expand All @@ -290,6 +291,34 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag
}
}

/**
* @brief Mark all the input ports from this federate as known to be absent until FOREVER.
*
* This does nothing if the federate is not using decentralized coordination.
* This function acquires the mutex on the top-level environment.
* @param fed_id The ID of the federate.
*/
static void mark_inputs_known_absent(int fed_id) {
#ifdef FEDERATED_DECENTRALIZED
// Note that when transient federates are supported, this will need to be updated because the
// federate could rejoin.
environment_t* env;
_lf_get_environments(&env);
LF_MUTEX_LOCK(&env->mutex);

for (size_t i = 0; i < _lf_action_table_size; i++) {
lf_action_base_t* action = _lf_action_table[i];
if (action->source_id == fed_id) {
update_last_known_status_on_input_port(env, FOREVER_TAG, i);
}
}
LF_MUTEX_UNLOCK(&env->mutex);
#else
// Do nothing, except suppress unused parameter error.
(void)fed_id;
#endif // FEDERATED_DECENTRALIZED
}

/**
* Set the status of network port with id portID.
*
Expand Down Expand Up @@ -733,46 +762,46 @@ static void* listen_to_federates(void* _args) {
bool socket_closed = false;
// Read one byte to get the message type.
LF_PRINT_DEBUG("Waiting for a P2P message on socket %d.", *socket_id);
bool bad_message = false;
if (read_from_socket_close_on_error(socket_id, 1, buffer)) {
// Socket has been closed.
lf_print("Socket from federate %d is closed.", fed_id);
// Stop listening to this federate.
socket_closed = true;
break;
}
LF_PRINT_DEBUG("Received a P2P message on socket %d of type %d.", *socket_id, buffer[0]);
bool bad_message = false;
switch (buffer[0]) {
case MSG_TYPE_P2P_MESSAGE:
LF_PRINT_LOG("Received untimed message from federate %d.", fed_id);
if (handle_message(socket_id, fed_id)) {
// Failed to complete the reading of a message on a physical connection.
lf_print_warning("Failed to complete reading of message on physical connection.");
socket_closed = true;
}
break;
case MSG_TYPE_P2P_TAGGED_MESSAGE:
LF_PRINT_LOG("Received tagged message from federate %d.", fed_id);
if (handle_tagged_message(socket_id, fed_id)) {
// P2P tagged messages are only used in decentralized coordination, and
// it is not a fatal error if the socket is closed before the whole message is read.
// But this thread should exit.
lf_print_warning("Failed to complete reading of tagged message.");
socket_closed = true;
}
break;
case MSG_TYPE_PORT_ABSENT:
LF_PRINT_LOG("Received port absent message from federate %d.", fed_id);
if (handle_port_absent_message(socket_id, fed_id)) {
// P2P tagged messages are only used in decentralized coordination, and
// it is not a fatal error if the socket is closed before the whole message is read.
// But this thread should exit.
lf_print_warning("Failed to complete reading of tagged message.");
socket_closed = true;
} else {
LF_PRINT_DEBUG("Received a P2P message on socket %d of type %d.", *socket_id, buffer[0]);
switch (buffer[0]) {
case MSG_TYPE_P2P_MESSAGE:
LF_PRINT_LOG("Received untimed message from federate %d.", fed_id);
if (handle_message(socket_id, fed_id)) {
// Failed to complete the reading of a message on a physical connection.
lf_print_warning("Failed to complete reading of message on physical connection.");
socket_closed = true;
}
break;
case MSG_TYPE_P2P_TAGGED_MESSAGE:
LF_PRINT_LOG("Received tagged message from federate %d.", fed_id);
if (handle_tagged_message(socket_id, fed_id)) {
// P2P tagged messages are only used in decentralized coordination, and
// it is not a fatal error if the socket is closed before the whole message is read.
// But this thread should exit.
lf_print_warning("Failed to complete reading of tagged message.");
socket_closed = true;
}
break;
case MSG_TYPE_PORT_ABSENT:
LF_PRINT_LOG("Received port absent message from federate %d.", fed_id);
if (handle_port_absent_message(socket_id, fed_id)) {
// P2P tagged messages are only used in decentralized coordination, and
// it is not a fatal error if the socket is closed before the whole message is read.
// But this thread should exit.
lf_print_warning("Failed to complete reading of tagged message.");
socket_closed = true;
}
break;
default:
bad_message = true;
}
break;
default:
bad_message = true;
}
if (bad_message) {
lf_print_error("Received erroneous message type: %d. Closing the socket.", buffer[0]);
Expand All @@ -781,12 +810,10 @@ static void* listen_to_federates(void* _args) {
break; // while loop
}
if (socket_closed) {
// NOTE: For decentralized execution, once this socket is closed, we could
// For decentralized execution, once this socket is closed, we
// update last known tags of all ports connected to the specified federate to FOREVER_TAG,
// which would eliminate the need to wait for STAA to assume an input is absent.
// However, at this time, we don't know which ports correspond to which upstream federates.
// The code generator would have to encode this information. Once that is done,
// we could call update_last_known_status_on_input_port with FOREVER_TAG.
mark_inputs_known_absent(fed_id);

break; // while loop
}
Expand Down
1 change: 1 addition & 0 deletions include/core/lf_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ typedef struct {
trigger_t* trigger; // THIS HAS TO MATCH lf_action_internal_t
self_base_t* parent;
bool has_value;
int source_id; // Used only for federated network input actions.
} lf_action_base_t;

/**
Expand Down
2 changes: 1 addition & 1 deletion lingua-franca-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
master
decentralized-timeout
1 change: 1 addition & 0 deletions python/include/python_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ typedef struct {
lf_action_internal_t _base;
self_base_t* parent;
bool has_value;
int source_id;
PyObject* value;
FEDERATED_GENERIC_EXTENSION
} generic_action_instance_struct;
Expand Down
Loading