v1.0.4 - Fixed web socket limitation with the number of npubs in a subscription

This commit is contained in:
Your Name
2025-11-07 19:59:34 -05:00
parent 3792649ed9
commit b276b44ded
7 changed files with 1496 additions and 9 deletions

View File

@@ -471,18 +471,717 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
case LWS_CALLBACK_RECEIVE:
if (len > 0) {
DEBUG_TRACE("LWS_CALLBACK_RECEIVE: received %zu bytes", len);
// Check if client is rate limited for malformed requests
if (is_client_rate_limited_for_malformed_requests(pss)) {
send_notice_message(wsi, "error: too many malformed requests - temporarily blocked");
return 0;
}
// Check if this is a fragmented message
int is_first_fragment = lws_is_first_fragment(wsi);
int is_final_fragment = lws_is_final_fragment(wsi);
size_t remaining_payload = lws_remaining_packet_payload(wsi);
DEBUG_TRACE("Fragment info: first=%d, final=%d, remaining=%zu, reassembly_active=%d",
is_first_fragment, is_final_fragment, remaining_payload, pss->reassembly_active);
// Handle message reassembly for fragmented messages
// Only use reassembly if message is actually fragmented (not both first and final)
int is_fragmented = (is_first_fragment && !is_final_fragment) || pss->reassembly_active;
if (is_fragmented) {
// Start or continue reassembly
if (is_first_fragment) {
// First fragment - initialize reassembly buffer
if (pss->reassembly_buffer) {
DEBUG_WARN("Starting new reassembly but buffer already exists - cleaning up");
free(pss->reassembly_buffer);
}
pss->reassembly_buffer = NULL;
pss->reassembly_size = 0;
pss->reassembly_capacity = 0;
pss->reassembly_active = 1;
DEBUG_TRACE("Starting message reassembly");
}
// Ensure buffer has enough capacity
size_t needed_capacity = pss->reassembly_size + len + 1; // +1 for null terminator
if (needed_capacity > pss->reassembly_capacity) {
size_t new_capacity = pss->reassembly_capacity == 0 ? 8192 : pss->reassembly_capacity * 2;
while (new_capacity < needed_capacity) {
new_capacity *= 2;
}
char* new_buffer = realloc(pss->reassembly_buffer, new_capacity);
if (!new_buffer) {
DEBUG_ERROR("Failed to allocate reassembly buffer (capacity %zu)", new_capacity);
// Clean up and abort reassembly
free(pss->reassembly_buffer);
pss->reassembly_buffer = NULL;
pss->reassembly_size = 0;
pss->reassembly_capacity = 0;
pss->reassembly_active = 0;
send_notice_message(wsi, "error: message too large - memory allocation failed");
return 0;
}
pss->reassembly_buffer = new_buffer;
pss->reassembly_capacity = new_capacity;
DEBUG_TRACE("Expanded reassembly buffer to %zu bytes", new_capacity);
}
// Append fragment to buffer
memcpy(pss->reassembly_buffer + pss->reassembly_size, in, len);
pss->reassembly_size += len;
// Check if this is the final fragment
if (is_final_fragment) {
// Message complete - process it
pss->reassembly_buffer[pss->reassembly_size] = '\0';
pss->reassembly_active = 0;
DEBUG_TRACE("Message reassembly complete: total size %zu bytes", pss->reassembly_size);
// Process the complete message
char* complete_message = pss->reassembly_buffer;
size_t message_len = pss->reassembly_size;
// Reset reassembly state (but keep buffer for reuse if needed)
pss->reassembly_size = 0;
// Parse JSON message
DEBUG_TRACE("Parsing reassembled JSON message of length %zu", message_len);
cJSON* json = cJSON_Parse(complete_message);
// Process the message (same logic as before)
if (json && cJSON_IsArray(json)) {
// Get message type
cJSON* type = cJSON_GetArrayItem(json, 0);
if (type && cJSON_IsString(type)) {
const char* msg_type = cJSON_GetStringValue(type);
if (strcmp(msg_type, "EVENT") == 0) {
// Handle EVENT message
cJSON* event = cJSON_GetArrayItem(json, 1);
if (event && cJSON_IsObject(event)) {
// Extract event JSON string for unified validator
char *event_json_str = cJSON_Print(event);
if (!event_json_str) {
DEBUG_ERROR("Failed to serialize event JSON for validation");
cJSON* error_response = cJSON_CreateArray();
cJSON_AddItemToArray(error_response, cJSON_CreateString("OK"));
cJSON_AddItemToArray(error_response, cJSON_CreateString("unknown"));
cJSON_AddItemToArray(error_response, cJSON_CreateBool(0));
cJSON_AddItemToArray(error_response, cJSON_CreateString("error: failed to process event"));
char *error_str = cJSON_Print(error_response);
if (error_str) {
size_t error_len = strlen(error_str);
// Use proper message queue system instead of direct lws_write
if (queue_message(wsi, pss, error_str, error_len, LWS_WRITE_TEXT) != 0) {
DEBUG_ERROR("Failed to queue error response message");
}
free(error_str);
}
cJSON_Delete(error_response);
cJSON_Delete(json);
// Note: complete_message points to reassembly_buffer, which is managed separately
// and should not be freed here - it will be cleaned up in LWS_CALLBACK_CLOSED
return 0;
}
// Call unified validator with JSON string
size_t event_json_len = strlen(event_json_str);
int validation_result = nostr_validate_unified_request(event_json_str, event_json_len);
// Map validation result to old result format (0 = success, -1 = failure)
int result = (validation_result == NOSTR_SUCCESS) ? 0 : -1;
// Generate error message based on validation result
char error_message[512] = {0};
if (result != 0) {
switch (validation_result) {
case NOSTR_ERROR_INVALID_INPUT:
strncpy(error_message, "invalid: malformed event structure", sizeof(error_message) - 1);
break;
case NOSTR_ERROR_EVENT_INVALID_SIGNATURE:
strncpy(error_message, "invalid: signature verification failed", sizeof(error_message) - 1);
break;
case NOSTR_ERROR_EVENT_INVALID_ID:
strncpy(error_message, "invalid: event id verification failed", sizeof(error_message) - 1);
break;
case NOSTR_ERROR_EVENT_INVALID_PUBKEY:
strncpy(error_message, "invalid: invalid pubkey format", sizeof(error_message) - 1);
break;
case -103: // NOSTR_ERROR_EVENT_EXPIRED
strncpy(error_message, "rejected: event expired", sizeof(error_message) - 1);
break;
case -102: // NOSTR_ERROR_NIP42_DISABLED
strncpy(error_message, "auth-required: NIP-42 authentication required", sizeof(error_message) - 1);
break;
case -101: // NOSTR_ERROR_AUTH_REQUIRED
strncpy(error_message, "blocked: pubkey not authorized", sizeof(error_message) - 1);
break;
default:
strncpy(error_message, "error: validation failed", sizeof(error_message) - 1);
break;
}
}
// Cleanup event JSON string
free(event_json_str);
// Check for NIP-70 protected events
if (result == 0) {
// Check if event has protected tag ["-"]
int is_protected_event = 0;
cJSON* tags = cJSON_GetObjectItem(event, "tags");
if (tags && cJSON_IsArray(tags)) {
cJSON* tag = NULL;
cJSON_ArrayForEach(tag, tags) {
if (cJSON_IsArray(tag) && cJSON_GetArraySize(tag) >= 1) {
cJSON* tag_name = cJSON_GetArrayItem(tag, 0);
if (tag_name && cJSON_IsString(tag_name) &&
strcmp(cJSON_GetStringValue(tag_name), "-") == 0) {
is_protected_event = 1;
break;
}
}
}
}
if (is_protected_event) {
// Check if protected events are enabled using config
int protected_events_enabled = get_config_bool("nip70_protected_events_enabled", 0);
if (!protected_events_enabled) {
// Protected events not supported
result = -1;
strncpy(error_message, "blocked: protected events not supported", sizeof(error_message) - 1);
error_message[sizeof(error_message) - 1] = '\0';
DEBUG_WARN("Protected event rejected: protected events not enabled");
} else {
// Protected events enabled - check authentication
cJSON* pubkey_obj = cJSON_GetObjectItem(event, "pubkey");
const char* event_pubkey = pubkey_obj ? cJSON_GetStringValue(pubkey_obj) : NULL;
if (!pss || !pss->authenticated ||
!event_pubkey || strcmp(pss->authenticated_pubkey, event_pubkey) != 0) {
// Not authenticated or pubkey mismatch
result = -1;
strncpy(error_message, "auth-required: protected event requires authentication", sizeof(error_message) - 1);
error_message[sizeof(error_message) - 1] = '\0';
DEBUG_WARN("Protected event rejected: authentication required");
}
}
}
}
// Check for admin events (kind 23456) and intercept them
if (result == 0) {
cJSON* kind_obj = cJSON_GetObjectItem(event, "kind");
if (kind_obj && cJSON_IsNumber(kind_obj)) {
int event_kind = (int)cJSON_GetNumberValue(kind_obj);
DEBUG_TRACE("Processing event kind %d, message length: %zu", event_kind, message_len);
// Log reception of Kind 23456 events
if (event_kind == 23456) {
DEBUG_LOG("Admin event (kind 23456) received");
}
if (event_kind == 23456) {
// Enhanced admin event security - check authorization first
char auth_error[512] = {0};
int auth_result = is_authorized_admin_event(event, auth_error, sizeof(auth_error));
if (auth_result != 0) {
// Authorization failed - log and reject
DEBUG_WARN("Admin event authorization failed");
result = -1;
size_t error_len = strlen(auth_error);
size_t copy_len = (error_len < sizeof(error_message) - 1) ? error_len : sizeof(error_message) - 1;
memcpy(error_message, auth_error, copy_len);
error_message[copy_len] = '\0';
} else {
// Authorization successful - process through admin API
char admin_error[512] = {0};
int admin_result = process_admin_event_in_config(event, admin_error, sizeof(admin_error), wsi);
// Log results for Kind 23456 events
if (event_kind == 23456) {
if (admin_result != 0) {
char error_result_msg[512];
if (strlen(admin_error) > 0) {
// Safely truncate admin_error if too long
size_t max_error_len = sizeof(error_result_msg) - 50; // Leave room for prefix
size_t error_len = strlen(admin_error);
if (error_len > max_error_len) {
error_len = max_error_len;
}
char truncated_error[512];
memcpy(truncated_error, admin_error, error_len);
truncated_error[error_len] = '\0';
// Use a safer approach to avoid truncation warning
size_t prefix_len = snprintf(error_result_msg, sizeof(error_result_msg),
"ERROR: Kind %d event processing failed: ", event_kind);
if (prefix_len < sizeof(error_result_msg)) {
size_t remaining = sizeof(error_result_msg) - prefix_len;
size_t copy_len = strlen(truncated_error);
if (copy_len >= remaining) {
copy_len = remaining - 1;
}
memcpy(error_result_msg + prefix_len, truncated_error, copy_len);
error_result_msg[prefix_len + copy_len] = '\0';
}
} else {
snprintf(error_result_msg, sizeof(error_result_msg),
"ERROR: Kind %d event processing failed", event_kind);
}
DEBUG_ERROR(error_result_msg);
}
}
if (admin_result != 0) {
DEBUG_ERROR("Failed to process admin event");
result = -1;
size_t error_len = strlen(admin_error);
size_t copy_len = (error_len < sizeof(error_message) - 1) ? error_len : sizeof(error_message) - 1;
memcpy(error_message, admin_error, copy_len);
error_message[copy_len] = '\0';
} else {
// Admin events are processed by the admin API, not broadcast to subscriptions
}
}
} else if (event_kind == 1059) {
// Check for NIP-17 gift wrap admin messages
char nip17_error[512] = {0};
cJSON* response_event = process_nip17_admin_message(event, nip17_error, sizeof(nip17_error), wsi);
if (!response_event) {
// Check if this is an error or if the command was already handled
if (strlen(nip17_error) > 0) {
// There was an actual error
DEBUG_ERROR("NIP-17 admin message processing failed");
result = -1;
size_t error_len = strlen(nip17_error);
size_t copy_len = (error_len < sizeof(error_message) - 1) ? error_len : sizeof(error_message) - 1;
memcpy(error_message, nip17_error, copy_len);
error_message[copy_len] = '\0';
} else {
// No error message means the command was already handled (plain text commands)
// Store the original gift wrap event in database
if (store_event(event) != 0) {
DEBUG_ERROR("Failed to store gift wrap event in database");
result = -1;
strncpy(error_message, "error: failed to store gift wrap event", sizeof(error_message) - 1);
}
}
} else {
// Store the original gift wrap event in database (unlike kind 23456)
if (store_event(event) != 0) {
DEBUG_ERROR("Failed to store gift wrap event in database");
result = -1;
strncpy(error_message, "error: failed to store gift wrap event", sizeof(error_message) - 1);
cJSON_Delete(response_event);
} else {
// Broadcast RESPONSE event to matching persistent subscriptions
broadcast_event_to_subscriptions(response_event);
// Clean up response event
cJSON_Delete(response_event);
}
}
} else if (event_kind == 14) {
// Check for DM stats commands addressed to relay
char dm_error[512] = {0};
int dm_result = process_dm_stats_command(event, dm_error, sizeof(dm_error), wsi);
if (dm_result != 0) {
DEBUG_ERROR("DM stats command processing failed");
result = -1;
size_t error_len = strlen(dm_error);
size_t copy_len = (error_len < sizeof(error_message) - 1) ? error_len : sizeof(error_message) - 1;
memcpy(error_message, dm_error, copy_len);
error_message[copy_len] = '\0';
} else {
// Store the DM event in database
if (store_event(event) != 0) {
DEBUG_ERROR("Failed to store DM event in database");
result = -1;
strncpy(error_message, "error: failed to store DM event", sizeof(error_message) - 1);
} else {
// Broadcast DM event to matching persistent subscriptions
broadcast_event_to_subscriptions(event);
}
}
} else {
// Check if this is an ephemeral event (kinds 20000-29999)
// Per NIP-01: ephemeral events are broadcast but never stored
if (event_kind >= 20000 && event_kind < 30000) {
DEBUG_TRACE("Ephemeral event (kind %d) - broadcasting without storage", event_kind);
// Broadcast directly to subscriptions without database storage
broadcast_event_to_subscriptions(event);
} else {
DEBUG_TRACE("Storing regular event in database");
// Regular event - store in database and broadcast
if (store_event(event) != 0) {
DEBUG_ERROR("Failed to store event in database");
result = -1;
strncpy(error_message, "error: failed to store event", sizeof(error_message) - 1);
} else {
DEBUG_LOG("Event stored and broadcast (kind %d)", event_kind);
// Broadcast event to matching persistent subscriptions
broadcast_event_to_subscriptions(event);
}
}
}
} else {
// Event without valid kind - try normal storage
DEBUG_WARN("Event without valid kind - trying normal storage");
if (store_event(event) != 0) {
DEBUG_ERROR("Failed to store event without kind in database");
result = -1;
strncpy(error_message, "error: failed to store event", sizeof(error_message) - 1);
} else {
broadcast_event_to_subscriptions(event);
}
}
}
// Send OK response
cJSON* event_id = cJSON_GetObjectItem(event, "id");
if (event_id && cJSON_IsString(event_id)) {
cJSON* response = cJSON_CreateArray();
cJSON_AddItemToArray(response, cJSON_CreateString("OK"));
cJSON_AddItemToArray(response, cJSON_CreateString(cJSON_GetStringValue(event_id)));
cJSON_AddItemToArray(response, cJSON_CreateBool(result == 0));
cJSON_AddItemToArray(response, cJSON_CreateString(strlen(error_message) > 0 ? error_message : ""));
char *response_str = cJSON_Print(response);
if (response_str) {
size_t response_len = strlen(response_str);
// DEBUG: Log WebSocket frame details before sending
DEBUG_TRACE("WS_FRAME_SEND: type=OK len=%zu data=%.100s%s",
response_len,
response_str,
response_len > 100 ? "..." : "");
// Queue message for proper libwebsockets pattern
if (queue_message(wsi, pss, response_str, response_len, LWS_WRITE_TEXT) != 0) {
DEBUG_ERROR("Failed to queue OK response message");
}
free(response_str);
}
cJSON_Delete(response);
}
}
} else if (strcmp(msg_type, "REQ") == 0) {
DEBUG_TRACE("REQ message received, starting processing");
// Check NIP-42 authentication for REQ subscriptions if required
if (pss && pss->nip42_auth_required_subscriptions && !pss->authenticated) {
DEBUG_TRACE("REQ rejected: NIP-42 authentication required");
if (!pss->auth_challenge_sent) {
send_nip42_auth_challenge(wsi, pss);
} else {
send_notice_message(wsi, "NIP-42 authentication required for subscriptions");
DEBUG_WARN("REQ rejected: NIP-42 authentication required");
}
cJSON_Delete(json);
// Note: complete_message points to reassembly_buffer, which is managed separately
// and should not be freed here - it will be cleaned up in LWS_CALLBACK_CLOSED
return 0;
}
DEBUG_TRACE("REQ message passed authentication check");
// Handle REQ message
cJSON* sub_id = cJSON_GetArrayItem(json, 1);
if (sub_id && cJSON_IsString(sub_id)) {
const char* subscription_id = cJSON_GetStringValue(sub_id);
DEBUG_TRACE("Processing REQ message for subscription %s", subscription_id);
// Validate subscription ID before processing
if (!subscription_id) {
DEBUG_TRACE("REQ rejected: NULL subscription ID");
send_notice_message(wsi, "error: invalid subscription ID");
DEBUG_WARN("REQ rejected: NULL subscription ID");
record_malformed_request(pss);
cJSON_Delete(json);
// Note: complete_message points to reassembly_buffer, which is managed separately
// and should not be freed here - it will be cleaned up in LWS_CALLBACK_CLOSED
return 0;
}
// Validate subscription ID
if (!validate_subscription_id(subscription_id)) {
DEBUG_TRACE("REQ rejected: invalid subscription ID format");
send_notice_message(wsi, "error: invalid subscription ID");
DEBUG_WARN("REQ rejected: invalid subscription ID");
cJSON_Delete(json);
// Note: complete_message points to reassembly_buffer, which is managed separately
// and should not be freed here - it will be cleaned up in LWS_CALLBACK_CLOSED
return 0;
}
DEBUG_TRACE("REQ subscription ID validated: %s", subscription_id);
// Create array of filter objects from position 2 onwards
cJSON* filters = cJSON_CreateArray();
if (!filters) {
DEBUG_TRACE("REQ failed: could not create filters array");
send_notice_message(wsi, "error: failed to process filters");
DEBUG_ERROR("REQ failed: could not create filters array");
cJSON_Delete(json);
// Note: complete_message points to reassembly_buffer, which is managed separately
// and should not be freed here - it will be cleaned up in LWS_CALLBACK_CLOSED
return 0;
}
int json_size = cJSON_GetArraySize(json);
int filter_count = 0;
for (int i = 2; i < json_size; i++) {
cJSON* filter = cJSON_GetArrayItem(json, i);
if (filter) {
cJSON_AddItemToArray(filters, cJSON_Duplicate(filter, 1));
filter_count++;
}
}
DEBUG_TRACE("REQ created %d filters from message", filter_count);
// Validate filters before processing
char filter_error[512] = {0};
if (!validate_filter_array(filters, filter_error, sizeof(filter_error))) {
DEBUG_TRACE("REQ rejected: filter validation failed - %s", filter_error);
send_notice_message(wsi, filter_error);
DEBUG_WARN("REQ rejected: invalid filters");
record_malformed_request(pss);
cJSON_Delete(filters);
cJSON_Delete(json);
// Note: complete_message points to reassembly_buffer, which is managed separately
// and should not be freed here - it will be cleaned up in LWS_CALLBACK_CLOSED
return 0;
}
DEBUG_TRACE("REQ filters validated successfully");
DEBUG_TRACE("About to call handle_req_message for subscription %s", subscription_id);
handle_req_message(subscription_id, filters, wsi, pss);
DEBUG_TRACE("handle_req_message completed for subscription %s", subscription_id);
// Clean up the filters array we created
cJSON_Delete(filters);
DEBUG_LOG("REQ subscription %s processed, sending EOSE", subscription_id);
// Send EOSE (End of Stored Events)
cJSON* eose_response = cJSON_CreateArray();
if (eose_response) {
cJSON_AddItemToArray(eose_response, cJSON_CreateString("EOSE"));
cJSON_AddItemToArray(eose_response, cJSON_CreateString(subscription_id));
char *eose_str = cJSON_Print(eose_response);
if (eose_str) {
size_t eose_len = strlen(eose_str);
// DEBUG: Log WebSocket frame details before sending
DEBUG_TRACE("WS_FRAME_SEND: type=EOSE len=%zu data=%.100s%s",
eose_len,
eose_str,
eose_len > 100 ? "..." : "");
// Queue message for proper libwebsockets pattern
if (queue_message(wsi, pss, eose_str, eose_len, LWS_WRITE_TEXT) != 0) {
DEBUG_ERROR("Failed to queue EOSE message");
}
free(eose_str);
}
cJSON_Delete(eose_response);
}
} else {
send_notice_message(wsi, "error: missing or invalid subscription ID in REQ");
DEBUG_WARN("REQ rejected: missing or invalid subscription ID");
}
} else if (strcmp(msg_type, "COUNT") == 0) {
// Check NIP-42 authentication for COUNT requests if required
if (pss && pss->nip42_auth_required_subscriptions && !pss->authenticated) {
if (!pss->auth_challenge_sent) {
send_nip42_auth_challenge(wsi, pss);
} else {
send_notice_message(wsi, "NIP-42 authentication required for count requests");
DEBUG_WARN("COUNT rejected: NIP-42 authentication required");
}
cJSON_Delete(json);
// Note: complete_message points to reassembly_buffer, which is managed separately
// and should not be freed here - it will be cleaned up in LWS_CALLBACK_CLOSED
return 0;
}
// Handle COUNT message
cJSON* sub_id = cJSON_GetArrayItem(json, 1);
if (sub_id && cJSON_IsString(sub_id)) {
const char* subscription_id = cJSON_GetStringValue(sub_id);
// Create array of filter objects from position 2 onwards
cJSON* filters = cJSON_CreateArray();
int json_size = cJSON_GetArraySize(json);
for (int i = 2; i < json_size; i++) {
cJSON* filter = cJSON_GetArrayItem(json, i);
if (filter) {
cJSON_AddItemToArray(filters, cJSON_Duplicate(filter, 1));
}
}
// Validate filters before processing
char filter_error[512] = {0};
if (!validate_filter_array(filters, filter_error, sizeof(filter_error))) {
send_notice_message(wsi, filter_error);
DEBUG_WARN("COUNT rejected: invalid filters");
record_malformed_request(pss);
cJSON_Delete(filters);
cJSON_Delete(json);
// Note: complete_message points to reassembly_buffer, which is managed separately
// and should not be freed here - it will be cleaned up in LWS_CALLBACK_CLOSED
return 0;
}
handle_count_message(subscription_id, filters, wsi, pss);
// Clean up the filters array we created
cJSON_Delete(filters);
}
} else if (strcmp(msg_type, "CLOSE") == 0) {
// Handle CLOSE message
cJSON* sub_id = cJSON_GetArrayItem(json, 1);
if (sub_id && cJSON_IsString(sub_id)) {
const char* subscription_id = cJSON_GetStringValue(sub_id);
// Validate subscription ID before processing
if (!subscription_id) {
send_notice_message(wsi, "error: invalid subscription ID in CLOSE");
DEBUG_WARN("CLOSE rejected: NULL subscription ID");
cJSON_Delete(json);
// Note: complete_message points to reassembly_buffer, which is managed separately
// and should not be freed here - it will be cleaned up in LWS_CALLBACK_CLOSED
return 0;
}
// Validate subscription ID
if (!validate_subscription_id(subscription_id)) {
send_notice_message(wsi, "error: invalid subscription ID in CLOSE");
DEBUG_WARN("CLOSE rejected: invalid subscription ID");
cJSON_Delete(json);
// Note: complete_message points to reassembly_buffer, which is managed separately
// and should not be freed here - it will be cleaned up in LWS_CALLBACK_CLOSED
return 0;
}
// CRITICAL FIX: Mark subscription as inactive in global manager FIRST
// This prevents other threads from accessing it during removal
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
subscription_t* target_sub = g_subscription_manager.active_subscriptions;
while (target_sub) {
if (strcmp(target_sub->id, subscription_id) == 0 && target_sub->wsi == wsi) {
target_sub->active = 0; // Mark as inactive immediately
break;
}
target_sub = target_sub->next;
}
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
// Now safe to remove from session list
if (pss) {
pthread_mutex_lock(&pss->session_lock);
struct subscription** current = &pss->subscriptions;
while (*current) {
if (strcmp((*current)->id, subscription_id) == 0) {
struct subscription* to_remove = *current;
*current = to_remove->session_next;
pss->subscription_count--;
break;
}
current = &((*current)->session_next);
}
pthread_mutex_unlock(&pss->session_lock);
}
// Finally remove from global manager (which will free it)
remove_subscription_from_manager(subscription_id, wsi);
// Subscription closed
} else {
send_notice_message(wsi, "error: missing or invalid subscription ID in CLOSE");
DEBUG_WARN("CLOSE rejected: missing or invalid subscription ID");
}
} else if (strcmp(msg_type, "AUTH") == 0) {
// Handle NIP-42 AUTH message
if (cJSON_GetArraySize(json) >= 2) {
cJSON* auth_payload = cJSON_GetArrayItem(json, 1);
if (cJSON_IsString(auth_payload)) {
// AUTH challenge response: ["AUTH", <challenge>] (unusual)
handle_nip42_auth_challenge_response(wsi, pss, cJSON_GetStringValue(auth_payload));
} else if (cJSON_IsObject(auth_payload)) {
// AUTH signed event: ["AUTH", <event>] (standard NIP-42)
handle_nip42_auth_signed_event(wsi, pss, auth_payload);
} else {
send_notice_message(wsi, "Invalid AUTH message format");
DEBUG_WARN("Received AUTH message with invalid payload type");
}
} else {
send_notice_message(wsi, "AUTH message requires payload");
DEBUG_WARN("Received AUTH message without payload");
}
} else {
// Unknown message type
char unknown_msg[128];
snprintf(unknown_msg, sizeof(unknown_msg), "Unknown message type: %.32s", msg_type);
DEBUG_WARN(unknown_msg);
send_notice_message(wsi, "Unknown message type");
}
}
}
// Clean up the reassembled message
if (json) cJSON_Delete(json);
// Note: complete_message points to reassembly_buffer, which is managed separately
// and should not be freed here - it will be cleaned up in LWS_CALLBACK_CLOSED
return 0; // Fragmented message processed
} else {
// Not the final fragment - continue accumulating
DEBUG_TRACE("Accumulated %zu bytes so far, waiting for more fragments", pss->reassembly_size);
return 0;
}
}
// Handle non-fragmented messages (original code path)
char *message = malloc(len + 1);
if (message) {
memcpy(message, in, len);
message[len] = '\0';
// Parse JSON message (this is the normal program flow)
DEBUG_TRACE("Parsing JSON message of length %zu", strlen(message));
cJSON* json = cJSON_Parse(message);
if (json && cJSON_IsArray(json)) {
// Get message type
@@ -682,7 +1381,7 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
if (kind_obj && cJSON_IsNumber(kind_obj)) {
int event_kind = (int)cJSON_GetNumberValue(kind_obj);
DEBUG_TRACE("Processing event kind %d", event_kind);
DEBUG_TRACE("Processing event kind %d, message length: %zu", event_kind, strlen(message));
// Log reception of Kind 23456 events
if (event_kind == 23456) {
@@ -885,10 +1584,13 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
}
}
} else if (strcmp(msg_type, "REQ") == 0) {
DEBUG_TRACE("REQ message received, starting processing");
// Log the full REQ message for debugging
// Check NIP-42 authentication for REQ subscriptions if required
if (pss && pss->nip42_auth_required_subscriptions && !pss->authenticated) {
DEBUG_TRACE("REQ rejected: NIP-42 authentication required");
if (!pss->auth_challenge_sent) {
send_nip42_auth_challenge(wsi, pss);
} else {
@@ -900,6 +1602,8 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
return 0;
}
DEBUG_TRACE("REQ message passed authentication check");
// Handle REQ message
cJSON* sub_id = cJSON_GetArrayItem(json, 1);
@@ -910,6 +1614,7 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
// Validate subscription ID before processing
if (!subscription_id) {
DEBUG_TRACE("REQ rejected: NULL subscription ID");
send_notice_message(wsi, "error: invalid subscription ID");
DEBUG_WARN("REQ rejected: NULL subscription ID");
record_malformed_request(pss);
@@ -920,6 +1625,7 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
// Validate subscription ID
if (!validate_subscription_id(subscription_id)) {
DEBUG_TRACE("REQ rejected: invalid subscription ID format");
send_notice_message(wsi, "error: invalid subscription ID");
DEBUG_WARN("REQ rejected: invalid subscription ID");
cJSON_Delete(json);
@@ -927,9 +1633,12 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
return 0;
}
DEBUG_TRACE("REQ subscription ID validated: %s", subscription_id);
// Create array of filter objects from position 2 onwards
cJSON* filters = cJSON_CreateArray();
if (!filters) {
DEBUG_TRACE("REQ failed: could not create filters array");
send_notice_message(wsi, "error: failed to process filters");
DEBUG_ERROR("REQ failed: could not create filters array");
cJSON_Delete(json);
@@ -938,16 +1647,21 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
}
int json_size = cJSON_GetArraySize(json);
int filter_count = 0;
for (int i = 2; i < json_size; i++) {
cJSON* filter = cJSON_GetArrayItem(json, i);
if (filter) {
cJSON_AddItemToArray(filters, cJSON_Duplicate(filter, 1));
filter_count++;
}
}
DEBUG_TRACE("REQ created %d filters from message", filter_count);
// Validate filters before processing
char filter_error[512] = {0};
if (!validate_filter_array(filters, filter_error, sizeof(filter_error))) {
DEBUG_TRACE("REQ rejected: filter validation failed - %s", filter_error);
send_notice_message(wsi, filter_error);
DEBUG_WARN("REQ rejected: invalid filters");
record_malformed_request(pss);
@@ -957,7 +1671,11 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
return 0;
}
DEBUG_TRACE("REQ filters validated successfully");
DEBUG_TRACE("About to call handle_req_message for subscription %s", subscription_id);
handle_req_message(subscription_id, filters, wsi, pss);
DEBUG_TRACE("handle_req_message completed for subscription %s", subscription_id);
// Clean up the filters array we created
cJSON_Delete(filters);
@@ -1191,6 +1909,15 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
pss->message_queue_count = 0;
pss->writeable_requested = 0;
// Clean up message reassembly buffer
if (pss->reassembly_buffer) {
free(pss->reassembly_buffer);
pss->reassembly_buffer = NULL;
}
pss->reassembly_size = 0;
pss->reassembly_capacity = 0;
pss->reassembly_active = 0;
// Clean up session subscriptions - copy IDs first to avoid use-after-free
pthread_mutex_lock(&pss->session_lock);