diff --git a/bun.lockb b/bun.lockb index 0bf8757..4fdf93f 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/nip42.test.ts b/nip42.test.ts index 794bc5f..ff1f277 100644 --- a/nip42.test.ts +++ b/nip42.test.ts @@ -1,15 +1,14 @@ import { test, expect } from 'bun:test' -import 'websocket-polyfill' import { makeAuthEvent } from './nip42.ts' -import { relayInit } from './relay.ts' +import { relayConnect } from './relay.ts' test('auth flow', () => { - const relay = relayInit('wss://nostr.wine') + const relay = relayConnect('wss://nostr.wine') const auth = makeAuthEvent(relay.url, 'chachacha') expect(auth.tags).toHaveLength(2) - expect(auth.tags[0]).toEqual(['relay', 'wss://nostr.wine']) + expect(auth.tags[0]).toEqual(['relay', 'wss://nostr.wine/']) expect(auth.tags[1]).toEqual(['challenge', 'chachacha']) expect(auth.kind).toEqual(22242) }) diff --git a/package.json b/package.json index ffd426f..d1ef8ed 100644 --- a/package.json +++ b/package.json @@ -185,7 +185,6 @@ "node-fetch": "^2.6.9", "prettier": "^3.0.3", "tsd": "^0.22.0", - "typescript": "^5.0.4", - "websocket-polyfill": "^0.0.3" + "typescript": "^5.0.4" } } diff --git a/pool.test.ts b/pool.test.ts index e89b50b..176fb66 100644 --- a/pool.test.ts +++ b/pool.test.ts @@ -1,5 +1,4 @@ import { test, expect } from 'bun:test' -import 'websocket-polyfill' import { finishEvent, type Event } from './event.ts' import { generatePrivateKey, getPublicKey } from './keys.ts' diff --git a/relay.test.ts b/relay.test.ts index 32033ad..7e01a82 100644 --- a/relay.test.ts +++ b/relay.test.ts @@ -1,125 +1,99 @@ -import { test, expect } from 'bun:test' -import 'websocket-polyfill' +import { test, expect, afterEach, beforeEach } from 'bun:test' import { finishEvent } from './event.ts' import { generatePrivateKey, getPublicKey } from './keys.ts' -import { relayInit } from './relay.ts' +import { Relay } from './relay.ts' -let relay = relayInit('wss://relay.damus.io/') +let relay = new Relay('wss://public.relaying.io') -beforeAll(() => { +beforeEach(() => { relay.connect() }) -afterAll(() => { +afterEach(() => { relay.close() }) -test('connectivity', () => { - return expect( - new Promise(resolve => { - relay.on('connect', () => { - resolve(true) - }) - relay.on('error', () => { - resolve(false) - }) - }), - ).resolves.toBe(true) +test('connectivity', async () => { + await relay.connect() + expect(relay.connected).toBeTrue() }) test('querying', async () => { - var resolve1: (value: boolean) => void - var resolve2: (value: boolean) => void + let resolve1: () => void + let resolve2: () => void - let sub = relay.sub([ - { - ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'], - }, - ]) - sub.on('event', event => { - expect(event).toHaveProperty('id', 'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027') - resolve1(true) - }) - sub.on('eose', () => { - resolve2(true) - }) - - let [t1, t2] = await Promise.all([ - new Promise(resolve => { + let waiting = Promise.all([ + new Promise(resolve => { resolve1 = resolve }), - new Promise(resolve => { + new Promise(resolve => { resolve2 = resolve }), ]) - expect(t1).toEqual(true) - expect(t2).toEqual(true) + relay.subscribe( + [ + { + ids: ['3abc6cbb215af0412ab2c9c8895d96a084297890fd0b4018f8427453350ca2e4'], + }, + ], + { + onevent(event) { + expect(event).toHaveProperty('id', '3abc6cbb215af0412ab2c9c8895d96a084297890fd0b4018f8427453350ca2e4') + expect(event).toHaveProperty('content', '+') + expect(event).toHaveProperty('kind', 7) + resolve1() + }, + oneose() { + resolve2() + }, + }, + ) + + let [t1, t2] = await waiting + expect(t1).toBeUndefined() + expect(t2).toBeUndefined() }, 10000) -test('async iterator', async () => { - let sub = relay.sub([ - { - ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'], - }, - ]) - - for await (const event of sub.events) { - expect(event).toHaveProperty('id', 'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027') - break - } -}) - -test('get()', async () => { - let event = await relay.get({ - ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'], - }) - - expect(event).toHaveProperty('id', 'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027') -}) - -test('list()', async () => { - let events = await relay.list([ - { - authors: ['3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d'], - kinds: [1], - limit: 2, - }, - ]) - - expect(events.length).toEqual(2) -}) - -test('listening (twice) and publishing', async () => { +test('listening and publishing and closing', async () => { let sk = generatePrivateKey() let pk = getPublicKey(sk) - var resolve1: (value: boolean) => void - var resolve2: (value: boolean) => void + var resolve1: (_: void) => void + var resolve2: (_: void) => void - let sub = relay.sub([ - { - kinds: [27572], - authors: [pk], - }, + let waiting = Promise.all([ + new Promise(resolve => { + resolve1 = resolve + }), + new Promise(resolve => { + resolve2 = resolve + }), ]) - sub.on('event', event => { - expect(event).toHaveProperty('pubkey', pk) - expect(event).toHaveProperty('kind', 27572) - expect(event).toHaveProperty('content', 'nostr-tools test suite') - resolve1(true) - }) - sub.on('event', event => { - expect(event).toHaveProperty('pubkey', pk) - expect(event).toHaveProperty('kind', 27572) - expect(event).toHaveProperty('content', 'nostr-tools test suite') - resolve2(true) - }) + let sub = await relay.subscribe( + [ + { + kinds: [23571], + authors: [pk], + }, + ], + { + onevent(event) { + expect(event).toHaveProperty('pubkey', pk) + expect(event).toHaveProperty('kind', 23571) + expect(event).toHaveProperty('content', 'nostr-tools test suite') + resolve1() + }, + onclose() { + resolve2() + }, + }, + ) let event = finishEvent( { - kind: 27572, + kind: 23571, created_at: Math.floor(Date.now() / 1000), tags: [], content: 'nostr-tools test suite', @@ -127,15 +101,10 @@ test('listening (twice) and publishing', async () => { sk, ) - relay.publish(event) - return expect( - Promise.all([ - new Promise(resolve => { - resolve1 = resolve - }), - new Promise(resolve => { - resolve2 = resolve - }), - ]), - ).resolves.toEqual([true, true]) + await relay.publish(event) + sub.close() + + let [t1, t2] = await waiting + expect(t1).toBeUndefined() + expect(t2).toBeUndefined() }) diff --git a/relay.ts b/relay.ts index 0011a98..82df7da 100644 --- a/relay.ts +++ b/relay.ts @@ -12,68 +12,13 @@ export function relayConnect(url: string) { return relay } -class Subscription { - public readonly relay: Relay - public readonly id: string - public closed: boolean = false - public eosed: boolean = false - - public alreadyHaveEvent: ((id: string) => boolean) | null = null - public receivedEvent: ((id: string) => boolean) | null = null - public readonly filters: Filter[] - - public onevent: (evt: Event) => void - public oneose: (() => void) | null = null - public onclose: ((reason: string) => void) | null = null - - constructor(relay: Relay, filters: Filter[], params: SubscriptionParams) { - this.relay = relay - this.filters = filters - this.id = params.id - this.onevent = params.onevent - this.oneose = params.oneose || null - this.onclose = params.onclose || null - this.alreadyHaveEvent = params.alreadyHaveEvent || null - this.receivedEvent = params.receivedEvent || null - } - - public close(reason: string) { - if (!this.closed) { - // if the connection was closed by the user calling .close() we will send a CLOSE message - // otherwise this._open will be already set to false so we will skip this - this.relay.send('["CLOSE",' + JSON.stringify(this.id) + ']') - this.closed = true - } - this.onclose?.(reason) - } -} - -type SubscriptionParams = { - id: string - onevent: (evt: Event) => void - oneose?: () => void - onclose?: (reason: string) => void - alreadyHaveEvent: ((id: string) => boolean) | null - receivedEvent: ((id: string) => boolean) | null -} - -type CountResolver = { - resolve: (count: number) => void - reject: (err: Error) => void -} - -type EventPublishResolver = { - resolve: (reason: string) => void - reject: (err: Error) => void -} - -class Relay { +export class Relay { public readonly url: string private _connected: boolean = false public trusted: boolean = false public onclose: (() => void) | null = null - public onnotice: (msg: string) => void = console.log + public onnotice: (msg: string) => void = msg => console.log(`NOTICE from ${this.url}: ${msg}`) private connectionPromise: Promise | undefined private openSubs = new Map() @@ -81,7 +26,7 @@ class Relay { private openEventPublishes = new Map() private ws: WebSocket | undefined private incomingMessageQueue = new Queue() - private handleNextInterval: ReturnType | null = null + private queueRunning = false private challenge: string | undefined private serial: number = 0 @@ -112,6 +57,8 @@ class Relay { public async connect(): Promise { if (this.connectionPromise) return this.connectionPromise + + this.challenge = undefined this.connectionPromise = new Promise((resolve, reject) => { try { this.ws = new WebSocket(this.url) @@ -125,8 +72,8 @@ class Relay { resolve() } - this.ws.onerror = () => { - reject() + this.ws.onerror = ev => { + reject((ev as any).message) if (this._connected) { this.onclose?.() this.closeAllSubscriptions('relay connection errored') @@ -143,19 +90,30 @@ class Relay { this.ws.onmessage = ev => { this.incomingMessageQueue.enqueue(ev.data as string) - if (!this.handleNextInterval) { - this.handleNextInterval = setInterval(this.handleNext.bind(this), 0) + if (!this.queueRunning) { + this.runQueue() } } }) + + return this.connectionPromise } - private handleNext() { + private async runQueue() { + this.queueRunning = true + while (true) { + if (false === this.handleNext()) { + break + } + await Promise.resolve() + } + this.queueRunning = false + } + + private handleNext(): undefined | false { const json = this.incomingMessageQueue.dequeue() if (!json) { - clearInterval(this.handleNextInterval as ReturnType) - this.handleNextInterval = null - return + return false } const subid = getSubscriptionId(json) @@ -249,36 +207,106 @@ class Relay { if (!this.challenge) throw new Error("can't perform auth, no challenge was received") const evt = nip42.makeAuthEvent(this.url, this.challenge) await Promise.all([signAuthEvent(evt), this.connect()]) - this.ws?.send('["AUTH",' + JSON.stringify(evt) + ']') + this.send('["AUTH",' + JSON.stringify(evt) + ']') } - public async publish(event: Event) { + public async publish(event: Event): Promise { await this.connect() - const ret = new Promise((resolve, reject) => { + const ret = new Promise((resolve, reject) => { this.openEventPublishes.set(event.id, { resolve, reject }) }) - this.ws?.send('["EVENT",' + JSON.stringify(event) + ']') + this.send('["EVENT",' + JSON.stringify(event) + ']') return ret } - public async count(filters: Filter[], params: { id?: string | null }) { + public async count(filters: Filter[], params: { id?: string | null }): Promise { await this.connect() this.serial++ const id = params?.id || 'count:' + this.serial - const ret = new Promise((resolve, reject) => { + const ret = new Promise((resolve, reject) => { this.openCountRequests.set(id, { resolve, reject }) }) - this.ws?.send('["COUNT","' + id + '"' + JSON.stringify(filters) + ']') + this.send('["COUNT","' + id + '",' + JSON.stringify(filters) + ']') return ret } - public async subscribe(filters: Filter[], params: SubscriptionParams & { id: string | undefined }) { + public async subscribe(filters: Filter[], params: Partial) { await this.connect() this.serial++ - params.id = params.id || 'sub:' + this.serial - const subscription = new Subscription(this, filters, params) - this.openSubs.set(params.id, subscription) - this.ws?.send('["REQ","' + params.id + '"' + JSON.stringify(filters) + ']') + const id = params.id || 'sub:' + this.serial + const subscription = new Subscription(this, filters, { + onevent: event => { + console.warn( + `onevent() callback not defined for subscription '${id}' in relay ${this.url}. event received:`, + event, + ) + }, + ...params, + id, + }) + this.openSubs.set(id, subscription) + this.send('["REQ","' + id + '",' + JSON.stringify(filters).substring(1)) return subscription } + + public close() { + this.closeAllSubscriptions('relay connection closed by us') + this._connected = false + this.ws?.close() + } +} + +export class Subscription { + public readonly relay: Relay + public readonly id: string + public closed: boolean = false + public eosed: boolean = false + + public alreadyHaveEvent: ((id: string) => boolean) | undefined + public receivedEvent: ((id: string) => boolean) | undefined + public readonly filters: Filter[] + + public onevent: (evt: Event) => void + public oneose: (() => void) | undefined + public onclose: ((reason: string) => void) | undefined + + constructor(relay: Relay, filters: Filter[], params: SubscriptionParams) { + this.relay = relay + this.filters = filters + this.id = params.id + this.onevent = params.onevent + this.oneose = params.oneose + this.onclose = params.onclose + this.alreadyHaveEvent = params.alreadyHaveEvent + this.receivedEvent = params.receivedEvent + } + + public close(reason: string = 'closed by caller') { + if (!this.closed) { + // if the connection was closed by the user calling .close() we will send a CLOSE message + // otherwise this._open will be already set to false so we will skip this + this.relay.send('["CLOSE",' + JSON.stringify(this.id) + ']') + this.closed = true + } + this.onclose?.(reason) + } +} + +export type SubscriptionParams = { + id: string + onevent: (evt: Event) => void + oneose?: () => void + onclose?: (reason: string) => void + alreadyHaveEvent?: (id: string) => boolean + receivedEvent?: (id: string) => boolean +} + +export type CountResolver = { + resolve: (count: number) => void + reject: (err: Error) => void +} + +export type EventPublishResolver = { + resolve: (reason: string) => void + reject: (err: Error) => void } diff --git a/utils.test.ts b/utils.test.ts index 4335c13..4a3d141 100644 --- a/utils.test.ts +++ b/utils.test.ts @@ -1,6 +1,6 @@ import { describe, test, expect } from 'bun:test' import { buildEvent } from './test-helpers.ts' -import { MessageQueue, insertEventIntoAscendingList, insertEventIntoDescendingList } from './utils.ts' +import { Queue, insertEventIntoAscendingList, insertEventIntoDescendingList } from './utils.ts' import type { Event } from './event.ts' @@ -216,27 +216,25 @@ describe('inserting into a asc sorted list of events', () => { describe('enque a message into MessageQueue', () => { test('enque into an empty queue', () => { - const queue = new MessageQueue() + const queue = new Queue() queue.enqueue('node1') expect(queue.first!.value).toBe('node1') }) test('enque into a non-empty queue', () => { - const queue = new MessageQueue() + const queue = new Queue() queue.enqueue('node1') queue.enqueue('node3') queue.enqueue('node2') expect(queue.first!.value).toBe('node1') expect(queue.last!.value).toBe('node2') - expect(queue.size).toBe(3) }) test('dequeue from an empty queue', () => { - const queue = new MessageQueue() + const queue = new Queue() const item1 = queue.dequeue() expect(item1).toBe(null) - expect(queue.size).toBe(0) }) test('dequeue from a non-empty queue', () => { - const queue = new MessageQueue() + const queue = new Queue() queue.enqueue('node1') queue.enqueue('node3') queue.enqueue('node2') @@ -246,14 +244,13 @@ describe('enque a message into MessageQueue', () => { expect(item2).toBe('node3') }) test('dequeue more than in queue', () => { - const queue = new MessageQueue() + const queue = new Queue() queue.enqueue('node1') queue.enqueue('node3') const item1 = queue.dequeue() expect(item1).toBe('node1') const item2 = queue.dequeue() expect(item2).toBe('node3') - expect(queue.size).toBe(0) const item3 = queue.dequeue() expect(item3).toBe(null) }) diff --git a/utils.ts b/utils.ts index cce4f6b..ab6ddd4 100644 --- a/utils.ts +++ b/utils.ts @@ -94,11 +94,11 @@ export function insertEventIntoAscendingList(sortedArray: Event[], event: Event) export class QueueNode { public value: V - public next: QueueNode | null + public next: QueueNode | null = null + public prev: QueueNode | null = null constructor(message: V) { this.value = message - this.next = null } } @@ -114,9 +114,17 @@ export class Queue { enqueue(value: V): boolean { const newNode = new QueueNode(value) if (!this.last) { + // list is empty this.first = newNode this.last = newNode + } else if (this.last === this.first) { + // list has a single element + this.last = newNode + this.last.prev = this.first + this.first.next = newNode } else { + // list has elements, add as last + newNode.prev = this.last this.last.next = newNode this.last = newNode } @@ -126,10 +134,16 @@ export class Queue { dequeue(): V | null { if (!this.first) return null - let prev = this.first - this.first = prev.next - prev.next = null + if (this.first === this.last) { + const target = this.first + this.first = null + this.last = null + return target.value + } - return prev.value + const target = this.first + this.first = target.next + + return target.value } }