Compare commits

...

4 Commits

Author SHA1 Message Date
fiatjaf
28f7553187 fix a type so jsr is happy. 2026-02-02 18:49:17 -03:00
fiatjaf
ca29d9b515 ok, we need the prepareSubscription method. 2026-02-02 18:46:50 -03:00
fiatjaf
ab802c8dbe automatic prune broken relay objects and keep track of relay idleness so they can be pruned. 2026-02-02 18:44:52 -03:00
fiatjaf
9db705d86c delete queue test since we don't have queues anymore. 2026-02-02 17:01:59 -03:00
5 changed files with 56 additions and 60 deletions

View File

@@ -88,9 +88,7 @@ export class AbstractSimplePool {
enableReconnect: this.enableReconnect, enableReconnect: this.enableReconnect,
}) })
relay.onclose = () => { relay.onclose = () => {
if (relay && !relay.enableReconnect) { this.relays.delete(url)
this.relays.delete(url)
}
} }
this.relays.set(url, relay) this.relays.set(url, relay)
} }
@@ -102,10 +100,15 @@ export class AbstractSimplePool {
} }
} }
await relay.connect({ try {
timeout: params?.connectionTimeout, await relay.connect({
abort: params?.abort, timeout: params?.connectionTimeout,
}) abort: params?.abort,
})
} catch (err) {
this.relays.delete(url)
throw err
}
return relay return relay
} }
@@ -380,4 +383,19 @@ export class AbstractSimplePool {
this.relays.forEach(conn => conn.close()) this.relays.forEach(conn => conn.close())
this.relays = new Map() this.relays = new Map()
} }
pruneIdleRelays(idleThresholdMs: number = 10000): string[] {
const prunedUrls: string[] = []
// check each relay's idle status and prune if over threshold
for (const [url, relay] of this.relays) {
if (relay.idleSince && Date.now() - relay.idleSince >= idleThresholdMs) {
this.relays.delete(url)
prunedUrls.push(url)
relay.close()
}
}
return prunedUrls
}
} }

View File

@@ -41,6 +41,8 @@ export class AbstractRelay {
public openSubs: Map<string, Subscription> = new Map() public openSubs: Map<string, Subscription> = new Map()
public enablePing: boolean | undefined public enablePing: boolean | undefined
public enableReconnect: boolean public enableReconnect: boolean
public idleSince: number | undefined = Date.now() // when undefined that means it isn't idle
public ongoingOperations: number = 0 // used to compute idleness
private reconnectTimeoutHandle: ReturnType<typeof setTimeout> | undefined private reconnectTimeoutHandle: ReturnType<typeof setTimeout> | undefined
private pingIntervalHandle: ReturnType<typeof setInterval> | undefined private pingIntervalHandle: ReturnType<typeof setInterval> | undefined
private reconnectAttempts: number = 0 private reconnectAttempts: number = 0
@@ -116,12 +118,12 @@ export class AbstractRelay {
this._connected = false this._connected = false
this.connectionPromise = undefined this.connectionPromise = undefined
this.idleSince = undefined
this.onclose?.()
if (this.enableReconnect && !this.skipReconnection) { if (this.enableReconnect && !this.skipReconnection) {
this.reconnect() this.reconnect()
} else { } else {
this.onclose?.()
this.closeAllSubscriptions(reason) this.closeAllSubscriptions(reason)
} }
} }
@@ -227,7 +229,7 @@ export class AbstractRelay {
const sub = this.subscribe( const sub = this.subscribe(
[{ ids: ['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'], limit: 0 }], [{ ids: ['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'], limit: 0 }],
{ {
label: 'forced-ping', label: '<forced-ping>',
oneose: () => { oneose: () => {
resolve(true) resolve(true)
sub.close() sub.close()
@@ -299,6 +301,9 @@ export class AbstractRelay {
} }
public async publish(event: Event): Promise<string> { public async publish(event: Event): Promise<string> {
this.idleSince = undefined
this.ongoingOperations++
const ret = new Promise<string>((resolve, reject) => { const ret = new Promise<string>((resolve, reject) => {
const timeout = setTimeout(() => { const timeout = setTimeout(() => {
const ep = this.openEventPublishes.get(event.id) as EventPublishResolver const ep = this.openEventPublishes.get(event.id) as EventPublishResolver
@@ -310,6 +315,11 @@ export class AbstractRelay {
this.openEventPublishes.set(event.id, { resolve, reject, timeout }) this.openEventPublishes.set(event.id, { resolve, reject, timeout })
}) })
this.send('["EVENT",' + JSON.stringify(event) + ']') this.send('["EVENT",' + JSON.stringify(event) + ']')
// compute idleness state
this.ongoingOperations--
if (this.ongoingOperations === 0) this.idleSince = Date.now()
return ret return ret
} }
@@ -327,6 +337,11 @@ export class AbstractRelay {
filters: Filter[], filters: Filter[],
params: Partial<SubscriptionParams> & { label?: string; id?: string }, params: Partial<SubscriptionParams> & { label?: string; id?: string },
): Subscription { ): Subscription {
if (params.label !== '<forced-ping>') {
this.idleSince = undefined
this.ongoingOperations++
}
const sub = this.prepareSubscription(filters, params) const sub = this.prepareSubscription(filters, params)
sub.fire() sub.fire()
@@ -343,9 +358,9 @@ export class AbstractRelay {
): Subscription { ): Subscription {
this.serial++ this.serial++
const id = params.id || (params.label ? params.label + ':' : 'sub:') + this.serial const id = params.id || (params.label ? params.label + ':' : 'sub:') + this.serial
const subscription = new Subscription(this, id, filters, params) const sub = new Subscription(this, id, filters, params)
this.openSubs.set(id, subscription) this.openSubs.set(id, sub)
return subscription return sub
} }
public close() { public close() {
@@ -360,6 +375,7 @@ export class AbstractRelay {
} }
this.closeAllSubscriptions('relay connection closed by us') this.closeAllSubscriptions('relay connection closed by us')
this._connected = false this._connected = false
this.idleSince = undefined
this.onclose?.() this.onclose?.()
if (this.ws?.readyState === this._WebSocket.OPEN) { if (this.ws?.readyState === this._WebSocket.OPEN) {
this.ws?.close() this.ws?.close()
@@ -368,10 +384,10 @@ export class AbstractRelay {
// this is the function assigned to this.ws.onmessage // this is the function assigned to this.ws.onmessage
// it's exposed for testing and debugging purposes // it's exposed for testing and debugging purposes
public _onmessage(ev: MessageEvent<any>) { public _onmessage(ev: MessageEvent<any>): void {
const json = ev.data const json = ev.data
if (!json) { if (!json) {
return false return
} }
// shortcut EVENT sub // shortcut EVENT sub
@@ -549,6 +565,11 @@ export class Subscription {
this.closed = true this.closed = true
} }
this.relay.openSubs.delete(this.id) this.relay.openSubs.delete(this.id)
// compute idleness state
this.relay.ongoingOperations--
if (this.relay.ongoingOperations === 0) this.relay.idleSince = Date.now()
this.onclose?.(reason) this.onclose?.(reason)
} }
} }

View File

@@ -1,6 +1,6 @@
{ {
"name": "@nostr/tools", "name": "@nostr/tools",
"version": "2.22.2", "version": "2.23.0",
"exports": { "exports": {
".": "./index.ts", ".": "./index.ts",
"./core": "./core.ts", "./core": "./core.ts",

View File

@@ -1,7 +1,7 @@
{ {
"type": "module", "type": "module",
"name": "nostr-tools", "name": "nostr-tools",
"version": "2.22.2", "version": "2.23.0",
"description": "Tools for making a Nostr client.", "description": "Tools for making a Nostr client.",
"repository": { "repository": {
"type": "git", "type": "git",

View File

@@ -1,7 +1,6 @@
import { describe, test, expect } from 'bun:test' import { describe, test, expect } from 'bun:test'
import { buildEvent } from './test-helpers.ts' import { buildEvent } from './test-helpers.ts'
import { import {
Queue,
insertEventIntoAscendingList, insertEventIntoAscendingList,
insertEventIntoDescendingList, insertEventIntoDescendingList,
binarySearch, binarySearch,
@@ -221,48 +220,6 @@ describe('inserting into a asc sorted list of events', () => {
}) })
}) })
describe('enqueue a message into MessageQueue', () => {
test('enqueue into an empty queue', () => {
const queue = new Queue()
queue.enqueue('node1')
expect(queue.first!.value).toBe('node1')
})
test('enqueue into a non-empty queue', () => {
const queue = new Queue()
queue.enqueue('node1')
queue.enqueue('node3')
queue.enqueue('node2')
expect(queue.first!.value).toBe('node1')
expect(queue.last!.value).toBe('node2')
})
test('dequeue from an empty queue', () => {
const queue = new Queue()
const item1 = queue.dequeue()
expect(item1).toBe(null)
})
test('dequeue from a non-empty queue', () => {
const queue = new Queue()
queue.enqueue('node1')
queue.enqueue('node3')
queue.enqueue('node2')
const item1 = queue.dequeue()
expect(item1).toBe('node1')
const item2 = queue.dequeue()
expect(item2).toBe('node3')
})
test('dequeue more than in queue', () => {
const queue = new Queue()
queue.enqueue('node1')
queue.enqueue('node3')
const item1 = queue.dequeue()
expect(item1).toBe('node1')
const item2 = queue.dequeue()
expect(item2).toBe('node3')
const item3 = queue.dequeue()
expect(item3).toBe(null)
})
})
test('binary search', () => { test('binary search', () => {
expect(binarySearch(['a', 'b', 'd', 'e'], b => ('e' < b ? -1 : 'e' === b ? 0 : 1))).toEqual([3, true]) expect(binarySearch(['a', 'b', 'd', 'e'], b => ('e' < b ? -1 : 'e' === b ? 0 : 1))).toEqual([3, true])
expect(binarySearch(['a', 'b', 'd', 'e'], b => ('x' < b ? -1 : 'x' === b ? 0 : 1))).toEqual([4, false]) expect(binarySearch(['a', 'b', 'd', 'e'], b => ('x' < b ? -1 : 'x' === b ? 0 : 1))).toEqual([4, false])