diff --git a/README.md b/README.md index e4c9457..cd5221e 100644 --- a/README.md +++ b/README.md @@ -134,12 +134,7 @@ const pool = new SimplePool() let relays = ['wss://relay.example.com', 'wss://relay.example2.com'] -relays.forEach(async url => { - let relay = pool.ensureRelay(url) - await relay.connect() -}) - -let relay = pool.ensureRelay('wss://relay.example3.com') +let relay = await pool.ensureRelay('wss://relay.example3.com') let subs = pool.sub([...relays, relay], { authors: ['32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245'] diff --git a/package.json b/package.json index ab9280c..b81ab91 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "nostr-tools", - "version": "1.3.0", + "version": "1.3.1", "description": "Tools for making a Nostr client.", "repository": { "type": "git", diff --git a/pool.test.js b/pool.test.js index 28e0c05..1c6805f 100644 --- a/pool.test.js +++ b/pool.test.js @@ -19,50 +19,23 @@ let relays = [ 'wss://nostr.zebedee.cloud/' ] -beforeAll(async () => { - Promise.all( - relays.map(relay => { - try { - let r = pool.ensureRelay(relay) - return r.connect() - } catch (err) { - /***/ - } - }) - ) -}) - afterAll(async () => { - relays.forEach(relay => { - try { - let r = pool.ensureRelay(relay) - r.close() - } catch (err) { - /***/ - } - }) + await pool.close([...relays, 'wss://nostr-relay.untethr.me']) }) test('removing duplicates when querying', async () => { let priv = generatePrivateKey() let pub = getPublicKey(priv) - let subs = pool.sub(relays, [ - { - authors: [pub] - } - ]) - + let sub = pool.sub(relays, [{authors: [pub]}]) let received = [] - subs.forEach(sub => - sub.on('event', event => { - // this should be called only once even though we're listening - // to multiple relays because the events will be catched and - // deduplicated efficiently (without even being parsed) - received.push(event) - }) - ) + sub.on('event', event => { + // this should be called only once even though we're listening + // to multiple relays because the events will be catched and + // deduplicated efficiently (without even being parsed) + received.push(event) + }) let event = { pubkey: pub, @@ -81,25 +54,22 @@ test('removing duplicates when querying', async () => { expect(received).toHaveLength(1) }) -test('removing duplicates correctly when double querying', async () => { +test('same with double querying', async () => { let priv = generatePrivateKey() let pub = getPublicKey(priv) - let subs1 = pool.sub(relays, [{authors: [pub]}]) - let subs2 = pool.sub(relays, [{authors: [pub]}]) + let sub1 = pool.sub(relays, [{authors: [pub]}]) + let sub2 = pool.sub(relays, [{authors: [pub]}]) let received = [] - subs1.forEach(sub => - sub.on('event', event => { - received.push(event) - }) - ) - subs2.forEach(sub => - sub.on('event', event => { - received.push(event) - }) - ) + sub1.on('event', event => { + received.push(event) + }) + + sub2.on('event', event => { + received.push(event) + }) let event = { pubkey: pub, @@ -117,3 +87,37 @@ test('removing duplicates correctly when double querying', async () => { expect(received).toHaveLength(2) }) + +test('get()', async () => { + let event = await pool.get(relays, { + ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'] + }) + + expect(event).toHaveProperty( + 'id', + 'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027' + ) +}) + +test('list()', async () => { + let events = await pool.list( + [...relays, 'wss://offchain.pub', 'wss://eden.nostr.land'], + [ + { + authors: [ + '3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d' + ], + kinds: [1], + limit: 2 + } + ] + ) + + // the actual received number will be greater than 2, but there will be no duplicates + expect(events.length).toEqual( + events + .map(evt => evt.id) + .reduce((acc, n) => (acc.indexOf(n) !== -1 ? acc : [...acc, n]), []) + .length + ) +}) diff --git a/pool.ts b/pool.ts index 3ed55ac..b8256b3 100644 --- a/pool.ts +++ b/pool.ts @@ -12,7 +12,16 @@ export class SimplePool { defaultRelays.forEach(this.ensureRelay) } - ensureRelay(url: string): Relay { + async close(relays: string[]): Promise { + await Promise.all( + relays.map(async url => { + let relay = this._conn[normalizeURL(url)] + if (relay) await relay.close() + }) + ) + } + + async ensureRelay(url: string): Promise { const nm = normalizeURL(url) const existing = this._conn[nm] if (existing) return existing @@ -20,21 +29,74 @@ export class SimplePool { const relay = relayInit(nm) this._conn[nm] = relay + await relay.connect() + return relay } - sub(relays: string[], filters: Filter[], opts?: SubscriptionOptions): Sub[] { + sub(relays: string[], filters: Filter[], opts?: SubscriptionOptions): Sub { let _knownIds: Set = new Set() let modifiedOpts = opts || {} modifiedOpts.alreadyHaveEvent = id => _knownIds.has(id) - return relays.map(relay => { - let r = this._conn[relay] - if (!r) return badSub() + let subs: Sub[] = [] + let eventListeners: Set<(event: Event) => void> = new Set() + let eoseListeners: Set<() => void> = new Set() + let eosesMissing = relays.length + + let eoseSent = false + let eoseTimeout = setTimeout(() => { + eoseSent = true + for (let cb of eoseListeners.values()) { + cb() + } + }, 2400) + + relays.forEach(async relay => { + let r = await this.ensureRelay(relay) + if (!r) return let s = r.sub(filters, modifiedOpts) - s.on('event', (event: Event) => _knownIds.add(event.id as string)) - return s + s.on('event', (event: Event) => { + _knownIds.add(event.id as string) + for (let cb of eventListeners.values()) { + cb(event) + } + }) + s.on('eose', () => { + if (eoseSent) return + + eosesMissing-- + if (eosesMissing === 0) { + clearTimeout(eoseTimeout) + for (let cb of eoseListeners.values()) { + cb() + } + } + }) + subs.push(s) }) + + let greaterSub: Sub = { + sub(filters, opts) { + subs.forEach(sub => sub.sub(filters, opts)) + return greaterSub + }, + unsub() { + subs.forEach(sub => sub.unsub()) + }, + on(type, cb) { + if (type === 'event') { + eventListeners.add(cb) + } else if (type === 'eose') eoseListeners.add(cb) + }, + off(type, cb) { + if (type === 'event') { + eventListeners.delete(cb) + } else if (type === 'eose') eoseListeners.delete(cb) + } + } + + return greaterSub } get( @@ -43,19 +105,15 @@ export class SimplePool { opts?: SubscriptionOptions ): Promise { return new Promise(resolve => { - let subs = this.sub(relays, [filter], opts) + let sub = this.sub(relays, [filter], opts) let timeout = setTimeout(() => { - subs.forEach(sub => sub.unsub(), 1500) + sub.unsub() resolve(null) - }) - subs.forEach(sub => { - sub.on('event', (event: Event) => { - resolve(event) - clearTimeout(timeout) - subs.forEach(sub => { - sub.unsub() - }) - }) + }, 1500) + sub.on('event', (event: Event) => { + resolve(event) + clearTimeout(timeout) + sub.unsub() }) }) } @@ -66,42 +124,24 @@ export class SimplePool { opts?: SubscriptionOptions ): Promise { return new Promise(resolve => { - let _knownIds: Set = new Set() - let modifiedOpts = opts || {} - modifiedOpts.alreadyHaveEvent = id => _knownIds.has(id) - let events: Event[] = [] + let sub = this.sub(relays, filters, opts) - let subs = this.sub(relays, filters, modifiedOpts) - let timeout = setTimeout(() => { - subs.forEach(sub => sub.unsub(), 1500) - resolve(events) + sub.on('event', (event: Event) => { + events.push(event) }) - let pendingEoses = relays.length - - subs.forEach(sub => { - sub.on('event', (event: Event) => { - events.push(event) - }) - - sub.on('eose', () => { - pendingEoses-- - if (pendingEoses === 0) { - resolve(events) - clearTimeout(timeout) - subs.forEach(sub => { - sub.unsub() - }) - } - }) + // we can rely on an eose being emitted here because pool.sub() will fake one + sub.on('eose', () => { + sub.unsub() + resolve(events) }) }) } publish(relays: string[], event: Event): Pub[] { return relays.map(relay => { - let r = this._conn[relay] + let r = this._conn[normalizeURL(relay)] if (!r) return badPub(relay) let s = r.publish(event) return s @@ -109,17 +149,6 @@ export class SimplePool { } } -function badSub(): Sub { - return { - on() {}, - off() {}, - sub(): Sub { - return badSub() - }, - unsub() {} - } -} - function badPub(relay: string): Pub { return { on(typ, cb) { diff --git a/relay.test.js b/relay.test.js index 6aed124..04c8326 100644 --- a/relay.test.js +++ b/relay.test.js @@ -32,7 +32,7 @@ test('connectivity', () => { ).resolves.toBe(true) }) -test('querying', () => { +test('querying', async () => { var resolve1 var resolve2 @@ -52,16 +52,42 @@ test('querying', () => { resolve2(true) }) - return expect( - Promise.all([ - new Promise(resolve => { - resolve1 = resolve - }), - new Promise(resolve => { - resolve2 = resolve - }) - ]) - ).resolves.toEqual([true, true]) + let [t1, t2] = await Promise.all([ + new Promise(resolve => { + resolve1 = resolve + }), + new Promise(resolve => { + resolve2 = resolve + }) + ]) + + expect(t1).toEqual(true) + expect(t2).toEqual(true) +}) + +test('get()', async () => { + let event = await relay.get({ + ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'] + }) + + expect(event).toHaveProperty( + 'id', + 'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027' + ) +}) + +test('list()', async () => { + let events = await relay.list([ + { + authors: [ + '3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d' + ], + kinds: [1], + limit: 2 + } + ]) + + expect(events.length).toEqual(2) }) test('listening (twice) and publishing', async () => { diff --git a/relay.ts b/relay.ts index 6cecd28..5e6fea7 100644 --- a/relay.ts +++ b/relay.ts @@ -108,8 +108,12 @@ export function relayInit(url: string): Relay { let subid = getSubscriptionId(json) if (subid) { - let {alreadyHaveEvent} = openSubs[subid] - if (alreadyHaveEvent && alreadyHaveEvent(getHex64(json, 'id'))) { + let so = openSubs[subid] + if ( + so && + so.alreadyHaveEvent && + so.alreadyHaveEvent(getHex64(json, 'id')) + ) { return } } @@ -320,6 +324,7 @@ export function relayInit(url: string): Relay { }, connect, close(): Promise { + if (ws.readyState > 1) return Promise.resolve() ws.close() return new Promise(resolve => { resolveClose = resolve