diff --git a/abstract-relay.ts b/abstract-relay.ts index 320f293..abd7801 100644 --- a/abstract-relay.ts +++ b/abstract-relay.ts @@ -3,9 +3,8 @@ import type { Event, EventTemplate, VerifiedEvent, Nostr, NostrEvent } from './core.ts' import { matchFilters, type Filter } from './filter.ts' import { getHex64, getSubscriptionId } from './fakejson.ts' -import { Queue, normalizeURL } from './utils.ts' +import { normalizeURL } from './utils.ts' import { makeAuthEvent } from './nip42.ts' -import { yieldThread } from './helpers.ts' type RelayWebSocket = WebSocket & { ping?(): void @@ -51,8 +50,6 @@ export class AbstractRelay { private openCountRequests = new Map() private openEventPublishes = new Map() private ws: RelayWebSocket | undefined - private incomingMessageQueue = new Queue() - private queueRunning = false private challenge: string | undefined private authPromise: Promise | undefined private serial: number = 0 @@ -269,122 +266,6 @@ export class AbstractRelay { } } - private async runQueue() { - this.queueRunning = true - while (true) { - if (false === this.handleNext()) { - break - } - await yieldThread() - } - this.queueRunning = false - } - - private handleNext(): undefined | false { - const json = this.incomingMessageQueue.dequeue() - if (!json) { - return false - } - - // shortcut EVENT sub - const subid = getSubscriptionId(json) - if (subid) { - const so = this.openSubs.get(subid as string) - if (!so) { - // this is an EVENT message, but for a subscription we don't have, so just stop here - return - } - - // this will be called only when this message is a EVENT message for a subscription we have - // we do this before parsing the JSON to not have to do that for duplicate events - // since JSON parsing is slow - const id = getHex64(json, 'id') - const alreadyHave = so.alreadyHaveEvent?.(id) - - // notify any interested client that the relay has this event - // (do this after alreadyHaveEvent() because the client may rely on this to answer that) - so.receivedEvent?.(this, id) - - if (alreadyHave) { - // if we had already seen this event we can just stop here - return - } - } - - try { - let data = JSON.parse(json) - // we won't do any checks against the data since all failures (i.e. invalid messages from relays) - // will naturally be caught by the encompassing try..catch block - - switch (data[0]) { - case 'EVENT': { - const so = this.openSubs.get(data[1] as string) as Subscription - const event = data[2] as NostrEvent - if (this.verifyEvent(event) && matchFilters(so.filters, event)) { - so.onevent(event) - } - if (!so.lastEmitted || so.lastEmitted < event.created_at) so.lastEmitted = event.created_at - return - } - case 'COUNT': { - const id: string = data[1] - const payload = data[2] as { count: number } - const cr = this.openCountRequests.get(id) as CountResolver - if (cr) { - cr.resolve(payload.count) - this.openCountRequests.delete(id) - } - return - } - case 'EOSE': { - const so = this.openSubs.get(data[1] as string) - if (!so) return - so.receivedEose() - return - } - case 'OK': { - const id: string = data[1] - const ok: boolean = data[2] - const reason: string = data[3] - const ep = this.openEventPublishes.get(id) as EventPublishResolver - if (ep) { - clearTimeout(ep.timeout) - if (ok) ep.resolve(reason) - else ep.reject(new Error(reason)) - this.openEventPublishes.delete(id) - } - return - } - case 'CLOSED': { - const id: string = data[1] - const so = this.openSubs.get(id) - if (!so) return - so.closed = true - so.close(data[2] as string) - return - } - case 'NOTICE': { - this.onnotice(data[1] as string) - return - } - case 'AUTH': { - this.challenge = data[1] as string - if (this.onauth) { - this.auth(this.onauth) - } - return - } - default: { - const so = this.openSubs.get(data[1]) - so?.oncustom?.(data) - return - } - } - } catch (err) { - return - } - } - public async send(message: string) { if (!this.connectionPromise) throw new SendingOnClosedConnection(message, this.url) @@ -488,9 +369,109 @@ export class AbstractRelay { // this is the function assigned to this.ws.onmessage // it's exposed for testing and debugging purposes public _onmessage(ev: MessageEvent) { - this.incomingMessageQueue.enqueue(ev.data as string) - if (!this.queueRunning) { - this.runQueue() + const json = ev.data + if (!json) { + return false + } + + // shortcut EVENT sub + const subid = getSubscriptionId(json) + if (subid) { + const so = this.openSubs.get(subid as string) + if (!so) { + // this is an EVENT message, but for a subscription we don't have, so just stop here + return + } + + // this will be called only when this message is a EVENT message for a subscription we have + // we do this before parsing the JSON to not have to do that for duplicate events + // since JSON parsing is slow + const id = getHex64(json, 'id') + const alreadyHave = so.alreadyHaveEvent?.(id) + + // notify any interested client that the relay has this event + // (do this after alreadyHaveEvent() because the client may rely on this to answer that) + so.receivedEvent?.(this, id) + + if (alreadyHave) { + // if we had already seen this event we can just stop here + return + } + } + + try { + let data = JSON.parse(json) + // we won't do any checks against the data since all failures (i.e. invalid messages from relays) + // will naturally be caught by the encompassing try..catch block + + switch (data[0]) { + case 'EVENT': { + const so = this.openSubs.get(data[1] as string) as Subscription + const event = data[2] as NostrEvent + if (this.verifyEvent(event) && matchFilters(so.filters, event)) { + so.onevent(event) + } + if (!so.lastEmitted || so.lastEmitted < event.created_at) so.lastEmitted = event.created_at + return + } + case 'COUNT': { + const id: string = data[1] + const payload = data[2] as { count: number } + const cr = this.openCountRequests.get(id) as CountResolver + if (cr) { + cr.resolve(payload.count) + this.openCountRequests.delete(id) + } + return + } + case 'EOSE': { + const so = this.openSubs.get(data[1] as string) + if (!so) return + so.receivedEose() + return + } + case 'OK': { + const id: string = data[1] + const ok: boolean = data[2] + const reason: string = data[3] + const ep = this.openEventPublishes.get(id) as EventPublishResolver + if (ep) { + clearTimeout(ep.timeout) + if (ok) ep.resolve(reason) + else ep.reject(new Error(reason)) + this.openEventPublishes.delete(id) + } + return + } + case 'CLOSED': { + const id: string = data[1] + const so = this.openSubs.get(id) + if (!so) return + so.closed = true + so.close(data[2] as string) + return + } + case 'NOTICE': { + this.onnotice(data[1] as string) + return + } + case 'AUTH': { + this.challenge = data[1] as string + if (this.onauth) { + this.auth(this.onauth) + } + return + } + default: { + const so = this.openSubs.get(data[1]) + so?.oncustom?.(data) + return + } + } + } catch (err) { + const [_, __, event] = JSON.parse(json) + ;(window as any).printer.maybe(event.pubkey, ':: caught err', event, this.url, err) + return } } } diff --git a/helpers.ts b/helpers.ts index 56d5df0..b509c79 100644 --- a/helpers.ts +++ b/helpers.ts @@ -1,37 +1,5 @@ import { verifiedSymbol, type Event, type Nostr, VerifiedEvent } from './core.ts' -export async function yieldThread() { - return new Promise((resolve, reject) => { - try { - // Check if MessageChannel is available - if (typeof MessageChannel !== 'undefined') { - const ch = new MessageChannel() - const handler = () => { - // @ts-ignore (typescript thinks this property should be called `removeListener`, but in fact it's `removeEventListener`) - ch.port1.removeEventListener('message', handler) - resolve() - } - // @ts-ignore (typescript thinks this property should be called `addListener`, but in fact it's `addEventListener`) - ch.port1.addEventListener('message', handler) - ch.port2.postMessage(0) - ch.port1.start() - } else { - if (typeof setImmediate !== 'undefined') { - setImmediate(resolve) - } else if (typeof setTimeout !== 'undefined') { - setTimeout(resolve, 0) - } else { - // Last resort - resolve immediately - resolve() - } - } - } catch (e) { - console.error('during yield: ', e) - reject(e) - } - }) -} - export const alwaysTrue: Nostr['verifyEvent'] = (t: Event): t is VerifiedEvent => { t[verifiedSymbol] = true return true diff --git a/utils.ts b/utils.ts index 5e28787..a6031f8 100644 --- a/utils.ts +++ b/utils.ts @@ -123,62 +123,3 @@ export function mergeReverseSortedLists(list1: NostrEvent[], list2: NostrEvent[] return result } - -export class QueueNode { - public value: V - public next: QueueNode | null = null - public prev: QueueNode | null = null - - constructor(message: V) { - this.value = message - } -} - -export class Queue { - public first: QueueNode | null - public last: QueueNode | null - - constructor() { - this.first = null - this.last = null - } - - enqueue(value: V): boolean { - const newNode = new QueueNode(value) - if (!this.last) { - // list is empty - this.first = newNode - this.last = newNode - } else if (this.last === this.first) { - // list has a single element - this.last = newNode - this.last.prev = this.first - this.first.next = newNode - } else { - // list has elements, add as last - newNode.prev = this.last - this.last.next = newNode - this.last = newNode - } - return true - } - - dequeue(): V | null { - if (!this.first) return null - - if (this.first === this.last) { - const target = this.first - this.first = null - this.last = null - return target.value - } - - const target = this.first - this.first = target.next - if (this.first) { - this.first.prev = null // fix: clean up prev pointer - } - - return target.value - } -}