v1.0.6 - Working on cleaning up subscriptions which were piling up. Set a startup cleanup, and a connection age limit.

This commit is contained in:
Your Name
2025-12-05 07:37:57 -04:00
parent 9b35f463ae
commit 03f036d60d
10 changed files with 663 additions and 5 deletions

View File

@@ -65,6 +65,9 @@ static const struct {
{"max_total_subscriptions", "5000"},
{"max_filters_per_subscription", "10"},
// Connection Management
{"max_connection_seconds", "86400"}, // 24 hours (0 = disabled)
// Event Processing Limits
{"max_event_tags", "100"},
{"max_content_length", "8196"},

View File

@@ -1807,7 +1807,8 @@ int main(int argc, char* argv[]) {
return 1;
}
// Cleanup orphaned subscriptions from previous runs
cleanup_all_subscriptions_on_startup();
// Start WebSocket Nostr relay server (port from CLI override or configuration)
int result = start_websocket_relay(cli_options.port_override, cli_options.strict_port); // Use CLI port override if specified, otherwise config

View File

@@ -12,8 +12,8 @@
// Version information (auto-updated by build system)
#define VERSION_MAJOR 1
#define VERSION_MINOR 0
#define VERSION_PATCH 5
#define VERSION "v1.0.5"
#define VERSION_PATCH 6
#define VERSION "v1.0.6"
// Avoid VERSION_MAJOR redefinition warning from nostr_core_lib
#undef VERSION_MAJOR

View File

@@ -999,6 +999,44 @@ void update_subscription_events_sent(const char* sub_id, int events_sent) {
}
}
// Cleanup all subscriptions on startup
void cleanup_all_subscriptions_on_startup(void) {
if (!g_db) {
DEBUG_ERROR("Database not available for startup cleanup");
return;
}
DEBUG_LOG("Performing startup subscription cleanup");
// Mark all active subscriptions as disconnected
const char* sql =
"UPDATE subscriptions "
"SET ended_at = strftime('%s', 'now') "
"WHERE event_type = 'created' AND ended_at IS NULL";
sqlite3_stmt* stmt;
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
if (rc != SQLITE_OK) {
DEBUG_ERROR("Failed to prepare startup cleanup query");
return;
}
rc = sqlite3_step(stmt);
int changes = sqlite3_changes(g_db);
sqlite3_finalize(stmt);
if (rc != SQLITE_DONE) {
DEBUG_ERROR("Failed to execute startup cleanup");
return;
}
if (changes > 0) {
DEBUG_LOG("Startup cleanup: marked %d orphaned subscriptions as disconnected", changes);
} else {
DEBUG_LOG("Startup cleanup: no orphaned subscriptions found");
}
}
///////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////

View File

@@ -120,4 +120,7 @@ void update_subscription_events_sent(const char* sub_id, int events_sent);
// Subscription query functions
int has_subscriptions_for_kind(int event_kind);
// Startup cleanup function
void cleanup_all_subscriptions_on_startup(void);
#endif // SUBSCRIPTIONS_H

View File

@@ -1984,6 +1984,73 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
return 0;
}
// Check and disconnect connections that have exceeded max age
// This function works by checking connection age through the subscription system
static void check_connection_age(int max_connection_seconds) {
if (max_connection_seconds <= 0) {
return; // Feature disabled
}
time_t current_time = time(NULL);
// Lock the subscription manager to safely iterate through subscriptions
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
// Track unique WSI pointers we've already checked to avoid duplicate checks
struct lws** checked_wsis = NULL;
int checked_count = 0;
int checked_capacity = 0;
subscription_t* sub = g_subscription_manager.active_subscriptions;
while (sub) {
if (!sub->active || !sub->wsi) {
sub = sub->next;
continue;
}
// Check if we've already processed this WSI
int already_checked = 0;
for (int i = 0; i < checked_count; i++) {
if (checked_wsis[i] == sub->wsi) {
already_checked = 1;
break;
}
}
if (!already_checked) {
// Get per-session data to check connection age
struct per_session_data *pss = (struct per_session_data *)lws_wsi_user(sub->wsi);
if (pss && pss->connection_established > 0) {
time_t connection_age = current_time - pss->connection_established;
if (connection_age >= max_connection_seconds) {
DEBUG_LOG("Disconnecting client %s: connection age %ld seconds exceeds limit %d seconds",
pss->client_ip, connection_age, max_connection_seconds);
// Close connection with normal closure status
lws_close_reason(sub->wsi, LWS_CLOSE_STATUS_NORMAL,
(unsigned char*)"Connection age limit reached", 28);
}
}
// Add to checked list
if (checked_count >= checked_capacity) {
checked_capacity = checked_capacity == 0 ? 64 : checked_capacity * 2;
checked_wsis = realloc(checked_wsis, checked_capacity * sizeof(struct lws*));
}
checked_wsis[checked_count++] = sub->wsi;
}
sub = sub->next;
}
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
// Cleanup
free(checked_wsis);
}
// WebSocket protocol definition
static struct lws_protocols protocols[] = {
{
@@ -2152,6 +2219,9 @@ int start_websocket_relay(int port_override, int strict_port) {
// Static variable for status post timing (initialize to 0 for immediate first post)
static time_t last_status_post_time = 0;
// Static variable for connection age check timing
static time_t last_connection_age_check = 0;
// Main event loop with proper signal handling
while (g_server_running && !g_shutdown_flag) {
@@ -2174,6 +2244,13 @@ int start_websocket_relay(int port_override, int strict_port) {
generate_and_post_status_event();
}
}
// Check connection age limits (every 60 seconds)
int max_connection_seconds = get_config_int("max_connection_seconds", 86400);
if (max_connection_seconds > 0 && (current_time - last_connection_age_check >= 60)) {
last_connection_age_check = current_time;
check_connection_age(max_connection_seconds);
}
}
lws_context_destroy(ws_context);