     1	import { requirePackage } from 'hooked-api'
     2	
     3	const pendingBroadcasts = new WeakMap()
     4	
     5	function normalizeAuthContext (auth) {
     6	  if (!auth) return null
     7	  if (typeof auth !== 'object') return null
     8	  const normalized = { ...auth }
     9	  if (normalized.userId !== undefined && normalized.userId !== null) {
    10	    normalized.userId = String(normalized.userId)
    11	  }
    12	  if (normalized.roles && !Array.isArray(normalized.roles)) {
    13	    normalized.roles = [normalized.roles]
    14	  }
    15	  return normalized
    16	}
    17	
    18	function matchesFilters (record, filters, searchSchemaStructure) {
    19	  if (!filters || Object.keys(filters).length === 0) return true
    20	
    21	  if (!record) return false
    22	
    23	  if (!searchSchemaStructure || Object.keys(searchSchemaStructure).length === 0) {
    24	    return false
    25	  }
    26	
    27	  for (const [filterKey, filterValue] of Object.entries(filters)) {
    28	    const fieldDef = searchSchemaStructure[filterKey]
    29	    if (!fieldDef) continue
    30	
    31	    const fieldName = fieldDef.actualField || filterKey
    32	
    33	    let recordValue
    34	    if (fieldDef.isRelationship) {
    35	      const relationshipData = record.relationships?.[filterKey]?.data
    36	      recordValue = relationshipData?.id
    37	    } else {
    38	      recordValue = record.attributes?.[fieldName] ?? record[fieldName]
    39	    }
    40	
    41	    if (recordValue === null || recordValue === undefined) {
    42	      if (filterValue !== null && filterValue !== undefined) {
    43	        return false
    44	      }
    45	      continue
    46	    }
    47	
    48	    if (typeof fieldDef.filterOperator === 'function') {
    49	      if (!fieldDef.filterRecord?.(record, filterValue)) {
    50	        return false
    51	      }
    52	      continue
    53	    }
    54	
    55	    const operator = fieldDef.filterOperator || '='
    56	
    57	    switch (operator) {
    58	      case 'like':
    59	        if (!String(recordValue).toLowerCase().includes(String(filterValue).toLowerCase())) {
    60	          return false
    61	        }
    62	        break
    63	      case 'in':
    64	        if (Array.isArray(filterValue)) {
    65	          if (!filterValue.includes(recordValue)) {
    66	            return false
    67	          }
    68	        } else if (recordValue !== filterValue) {
    69	          return false
    70	        }
    71	        break
    72	      case 'between':
    73	        if (Array.isArray(filterValue) && filterValue.length === 2) {
    74	          if (recordValue < filterValue[0] || recordValue > filterValue[1]) {
    75	            return false
    76	          }
    77	        }
    78	        break
    79	      case '>':
    80	        if (!(recordValue > filterValue)) return false
    81	        break
    82	      case '>=':
    83	        if (!(recordValue >= filterValue)) return false
    84	        break
    85	      case '<':
    86	        if (!(recordValue < filterValue)) return false
    87	        break
    88	      case '<=':
    89	        if (!(recordValue <= filterValue)) return false
    90	        break
    91	      case '!=':
    92	      case '<>':
    93	        if (recordValue === filterValue) return false
    94	        break
    95	      case '=':
    96	      default:
    97	        if (fieldDef.isRelationship) {
    98	          if (String(recordValue) !== String(filterValue)) {
    99	            return false
   100	          }
   101	        } else if (recordValue !== filterValue) {
   102	          return false
   103	        }
   104	    }
   105	  }
   106	
   107	  return true
   108	}
   109	
   110	function buildConfig (pluginOptions = {}) {
   111	  const authOptions = pluginOptions.auth || {}
   112	  const subscriptionOptions = pluginOptions.subscriptions || {}
   113	
   114	  return {
   115	    auth: {
   116	      authenticate: authOptions.authenticate || null,
   117	      requireAuth: authOptions.requireAuth === true,
   118	      allowClientProvidedAuth: authOptions.allowClientProvidedAuth === true,
   119	      anonymousContext: authOptions.anonymousContext || null,
   120	      onAuthenticationFailed: authOptions.onAuthenticationFailed || null
   121	    },
   122	    subscriptions: {
   123	      maxPerSocket: subscriptionOptions.maxPerSocket ?? 100
   124	    },
   125	    transport: pluginOptions.transport || {}
   126	  }
   127	}
   128	
   129	async function authenticateSocket ({ socket, api, helpers, log, config }) {
   130	  const { authenticate, allowClientProvidedAuth, anonymousContext, requireAuth, onAuthenticationFailed } = config.auth
   131	
   132	  try {
   133	    let authContext = null
   134	
   135	    if (authenticate) {
   136	      authContext = await authenticate({ socket, api, helpers, log })
   137	    } else if (allowClientProvidedAuth && socket.handshake.auth && typeof socket.handshake.auth === 'object') {
   138	      authContext = socket.handshake.auth
   139	    } else if (anonymousContext) {
   140	      authContext = anonymousContext
   141	    }
   142	
   143	    const normalized = normalizeAuthContext(authContext)
   144	    if (!normalized && requireAuth) {
   145	      throw new Error('Authentication required')
   146	    }
   147	
   148	    return normalized
   149	  } catch (error) {
   150	    if (onAuthenticationFailed) {
   151	      try {
   152	        await onAuthenticationFailed({ socket, error, log })
   153	      } catch (hookError) {
   154	        log.error('socketio auth failure handler threw error', hookError)
   155	      }
   156	    }
   157	    throw error
   158	  }
   159	}
   160	
   161	async function registerSubscription ({
   162	  socket,
   163	  data,
   164	  scopes,
   165	  runHooks,
   166	  log,
   167	  config
   168	}) {
   169	  const { resource, filters = {}, include, fields, subscriptionId } = data || {}
   170	
   171	  if (!resource || typeof resource !== 'string') {
   172	    throw Object.assign(new Error('Resource is required'), { code: 'RESOURCE_REQUIRED' })
   173	  }
   174	
   175	  const scope = scopes[resource]
   176	  if (!scope) {
   177	    throw Object.assign(new Error(`Resource '${resource}' not found`), { code: 'RESOURCE_NOT_FOUND' })
   178	  }
   179	
   180	  const auth = socket.data.auth || null
   181	
   182	  if (scope.checkPermissions) {
   183	    await scope.checkPermissions({
   184	      method: 'query',
   185	      originalContext: { auth }
   186	    })
   187	  }
   188	
   189	  const schemaInfo = scope.vars?.schemaInfo
   190	  const searchSchemaStructure = schemaInfo?.searchSchemaStructure
   191	  const searchSchemaInstance = schemaInfo?.searchSchemaInstance
   192	
   193	  const validatedFilters = { ...filters }
   194	  if (Object.keys(validatedFilters).length > 0) {
   195	    if (!searchSchemaStructure || Object.keys(searchSchemaStructure).length === 0 || !searchSchemaInstance) {
   196	      throw Object.assign(new Error(`Filtering is not enabled for resource '${resource}'`), { code: 'FILTERING_NOT_ENABLED' })
   197	    }
   198	
   199	    for (const filterKey of Object.keys(validatedFilters)) {
   200	      const fieldDef = searchSchemaStructure[filterKey]
   201	      if (fieldDef && typeof fieldDef.filterOperator === 'function' && !fieldDef.filterRecord) {
   202	        throw Object.assign(
   203	          new Error(`Filter '${filterKey}' uses custom SQL logic and requires 'filterRecord' for real-time subscriptions`),
   204	          { code: 'UNSUPPORTED_FILTER' }
   205	        )
   206	      }
   207	    }
   208	
   209	    const { validatedObject, errors } = await searchSchemaInstance.validate(validatedFilters, {
   210	      onlyObjectValues: true
   211	    })
   212	
   213	    if (errors && Object.keys(errors).length > 0) {
   214	      const error = new Error('Invalid filter values')
   215	      error.code = 'INVALID_FILTERS'
   216	      error.details = errors
   217	      throw error
   218	    }
   219	
   220	    Object.assign(validatedFilters, validatedObject)
   221	  }
   222	
   223	  if (include !== undefined) {
   224	    if (!Array.isArray(include)) {
   225	      throw Object.assign(new Error('Include parameter must be an array'), { code: 'INVALID_INCLUDE' })
   226	    }
   227	
   228	    const relationships = schemaInfo?.schemaRelationships || {}
   229	    for (const includePath of include) {
   230	      const baseName = includePath.split('.')[0]
   231	      if (!relationships[baseName]) {
   232	        throw Object.assign(new Error(`Invalid relationship '${baseName}' for resource '${resource}'`), {
   233	          code: 'INVALID_INCLUDE'
   234	        })
   235	      }
   236	    }
   237	  }
   238	
   239	  if (fields !== undefined) {
   240	    if (typeof fields !== 'object' || Array.isArray(fields) || fields === null) {
   241	      throw Object.assign(new Error('Fields parameter must be an object'), { code: 'INVALID_FIELDS' })
   242	    }
   243	
   244	    for (const [resourceType, fieldList] of Object.entries(fields)) {
   245	      if (!Array.isArray(fieldList)) {
   246	        throw Object.assign(new Error(`Fields for '${resourceType}' must be an array`), { code: 'INVALID_FIELDS' })
   247	      }
   248	    }
   249	  }
   250	
   251	  if (!socket.data.subscriptions) {
   252	    socket.data.subscriptions = new Map()
   253	  }
   254	
   255	  if (socket.data.subscriptions.size >= config.subscriptions.maxPerSocket) {
   256	    throw Object.assign(new Error('Subscription limit reached for this connection'), { code: 'SUBSCRIPTION_LIMIT' })
   257	  }
   258	
   259	  const subscription = {
   260	    id: subscriptionId || `${resource}-${Date.now()}-${Math.random().toString(36).slice(2, 10)}`,
   261	    resource,
   262	    filters: validatedFilters,
   263	    include: include || [],
   264	    fields: fields || {},
   265	    auth,
   266	    createdAt: new Date()
   267	  }
   268	
   269	  await runHooks('subscriptionFilters', { subscription, auth })
   270	
   271	  const roomName = `${resource}:updates`
   272	  socket.join(roomName)
   273	  socket.data.subscriptions.set(subscription.id, subscription)
   274	
   275	  log.info(`Socket ${socket.id} subscribed to ${resource}`, {
   276	    subscriptionId: subscription.id,
   277	    filters: subscription.filters
   278	  })
   279	
   280	  return {
   281	    subscriptionId: subscription.id,
   282	    resource: subscription.resource,
   283	    filters: subscription.filters,
   284	    include: subscription.include,
   285	    fields: subscription.fields,
   286	    status: 'active'
   287	  }
   288	}
   289	
   290	async function handleRestoreSubscriptions ({ socket, subscriptions, scopes, runHooks, log, config }) {
   291	  const restored = []
   292	  const failed = []
   293	
   294	  for (const sub of subscriptions) {
   295	    try {
   296	      const response = await registerSubscription({
   297	        socket,
   298	        data: sub,
   299	        scopes,
   300	        runHooks,
   301	        log,
   302	        config
   303	      })
   304	      restored.push(response.subscriptionId)
   305	    } catch (error) {
   306	      failed.push({
   307	        subscriptionId: sub?.subscriptionId || null,
   308	        error: {
   309	          code: error.code || 'SUBSCRIBE_ERROR',
   310	          message: error.message
   311	        }
   312	      })
   313	    }
   314	  }
   315	
   316	  return { restored, failed }
   317	}
   318	
   319	function getRecordForFiltering (context) {
   320	  return context?.minimalRecord || context?.originalMinimalRecord || null
   321	}
   322	
   323	async function performBroadcast ({ method, scopeName, id, context, api, io, log }) {
   324	  if (!io) return
   325	
   326	  const scope = api.resources[scopeName]
   327	  if (!scope) {
   328	    log.warn(`Socket.IO broadcast skipped: unknown resource ${scopeName}`)
   329	    return
   330	  }
   331	
   332	  const roomName = `${scopeName}:updates`
   333	  const socketsInRoom = await io.in(roomName).fetchSockets()
   334	  if (!socketsInRoom || socketsInRoom.length === 0) {
   335	    log.debug(`Socket.IO broadcast skipped: no subscribers for ${roomName}`)
   336	    return
   337	  }
   338	
   339	  const schemaInfo = scope.vars?.schemaInfo
   340	  const searchSchemaStructure = schemaInfo?.searchSchemaStructure
   341	  const recordForFiltering = getRecordForFiltering(context)
   342	
   343	  for (const socket of socketsInRoom) {
   344	    const subscriptions = socket.data.subscriptions
   345	    if (!subscriptions || subscriptions.size === 0) continue
   346	
   347	    for (const subscription of subscriptions.values()) {
   348	      if (subscription.resource !== scopeName) continue
   349	
   350	      if (recordForFiltering && subscription.filters && Object.keys(subscription.filters).length > 0) {
   351	        if (!matchesFilters(recordForFiltering, subscription.filters, searchSchemaStructure)) {
   352	          continue
   353	        }
   354	      }
   355	
   356	      const notification = {
   357	        type: `resource.${method}d`,
   358	        resource: scopeName,
   359	        id,
   360	        action: method,
   361	        subscriptionId: subscription.id,
   362	        meta: {
   363	          timestamp: new Date().toISOString()
   364	        }
   365	      }
   366	
   367	      if (method === 'delete') {
   368	        notification.deletedRecord = { id }
   369	      }
   370	
   371	      socket.emit('subscription.update', notification)
   372	      log.debug(`Socket.IO broadcast: ${scopeName}/${id} -> socket ${socket.id}`)
   373	      break
   374	    }
   375	  }
   376	}
   377	
   378	export const SocketIOPlugin = {
   379	  name: 'socketio',
   380	  dependencies: ['rest-api'],
   381	
   382	  async install ({ api, addHook, log, scopes, helpers, vars, runHooks, pluginOptions = {} }) {
   383	    const config = buildConfig(pluginOptions)
   384	
   385	    let Server
   386	    try {
   387	      ({ Server } = await import('socket.io'))
   388	    } catch (error) {
   389	      requirePackage('socket.io', 'socketio', 'Socket.IO is required for WebSocket support. This is a peer dependency.')
   390	      throw error
   391	    }
   392	
   393	    let createAdapter
   394	    let createClient
   395	    let io
   396	
   397	    api.startSocketServer = async (server, startOptions = {}) => {
   398	      const transportConfig = { ...config.transport, ...startOptions.transport }
   399	      const authConfig = { ...config.auth, ...(startOptions.auth || {}) }
   400	      const redisConfig = startOptions.redis ?? pluginOptions.redis ?? null
   401	
   402	      config.auth = authConfig
   403	
   404	      const defaultPath = vars.transport?.mountPath ? `${vars.transport.mountPath}/socket.io` : '/socket.io'
   405	      const path = startOptions.path || config.transport.path || defaultPath
   406	      const cors = startOptions.cors || config.transport.cors || { origin: '*', methods: ['GET', 'POST'] }
   407	
   408	      io = new Server(server, {
   409	        path,
   410	        cors,
   411	        transports: ['websocket', 'polling']
   412	      })
   413	
   414	      vars.socketIO = io
   415	      api.io = io
   416	
   417	      if (redisConfig) {
   418	        try {
   419	          ({ createClient } = await import('redis'))
   420	        } catch (error) {
   421	          requirePackage('redis', 'socketio', 'Redis is required for Socket.IO horizontal scaling. This is a peer dependency.')
   422	          throw error
   423	        }
   424	
   425	        try {
   426	          ({ createAdapter } = await import('@socket.io/redis-adapter'))
   427	        } catch (error) {
   428	          requirePackage('@socket.io/redis-adapter', 'socketio',
   429	            'Socket.IO Redis adapter is required for horizontal scaling. This is a peer dependency.')
   430	          throw error
   431	        }
   432	
   433	        const pubClient = createClient(redisConfig)
   434	        const subClient = pubClient.duplicate()
   435	
   436	        await Promise.all([
   437	          pubClient.connect(),
   438	          subClient.connect()
   439	        ])
   440	
   441	        io.adapter(createAdapter(pubClient, subClient))
   442	        vars.socketIORedisClients = { pubClient, subClient }
   443	        log.info('Socket.IO configured with Redis adapter')
   444	      }
   445	
   446	      io.use(async (socket, next) => {
   447	        try {
   448	          const authContext = await authenticateSocket({ socket, api, helpers, log, config })
   449	          socket.data.auth = authContext
   450	          socket.data.subscriptions = new Map()
   451	          next()
   452	        } catch (error) {
   453	          log.warn('Socket.IO authentication failed', error)
   454	          next(new Error(error.message || 'Authentication failed'))
   455	        }
   456	      })
   457	
   458	      io.on('connection', (socket) => {
   459	        log.info(`Socket connected: ${socket.id}`, {
   460	          userId: socket.data.auth?.userId ?? null
   461	        })
   462	
   463	        socket.emit('connected', {
   464	          socketId: socket.id,
   465	          serverTime: new Date().toISOString()
   466	        })
   467	
   468	        socket.on('subscribe', async (payload, callback) => {
   469	          try {
   470	            const result = await registerSubscription({
   471	              socket,
   472	              data: payload,
   473	              scopes,
   474	              runHooks,
   475	              log,
   476	              config
   477	            })
   478	
   479	            if (callback) callback({ success: true, data: result })
   480	            else socket.emit('subscription.created', result)
   481	          } catch (error) {
   482	            log.error('Socket.IO subscribe error', error)
   483	            const response = {
   484	              error: {
   485	                code: error.code || 'SUBSCRIBE_ERROR',
   486	                message: error.message
   487	              }
   488	            }
   489	            if (callback) callback(response)
   490	            else socket.emit('subscription.error', response.error)
   491	          }
   492	        })
   493	
   494	        socket.on('unsubscribe', (payload, callback) => {
   495	          try {
   496	            const subscriptionId = payload?.subscriptionId
   497	            if (!subscriptionId) {
   498	              const error = { code: 'MISSING_SUBSCRIPTION_ID', message: 'Subscription ID is required' }
   499	              if (callback) callback({ error })
   500	              return
   501	            }
   502	
   503	            const subscription = socket.data.subscriptions?.get(subscriptionId)
   504	            if (!subscription) {
   505	              const error = { code: 'SUBSCRIPTION_NOT_FOUND', message: 'Subscription not found' }
   506	              if (callback) callback({ error })
   507	              return
   508	            }
   509	
   510	            socket.data.subscriptions.delete(subscriptionId)
   511	            const hasOther = Array.from(socket.data.subscriptions.values())
   512	              .some((sub) => sub.resource === subscription.resource)
   513	            if (!hasOther) {
   514	              socket.leave(`${subscription.resource}:updates`)
   515	            }
   516	
   517	            if (callback) callback({ success: true })
   518	            log.info(`Socket ${socket.id} unsubscribed from ${subscription.resource}`, { subscriptionId })
   519	          } catch (error) {
   520	            log.error('Socket.IO unsubscribe error', error)
   521	            if (callback) {
   522	              callback({
   523	                error: {
   524	                  code: 'UNSUBSCRIBE_ERROR',
   525	                  message: error.message
   526	                }
   527	              })
   528	            }
   529	          }
   530	        })
   531	
   532	        socket.on('restore-subscriptions', async (payload, callback) => {
   533	          try {
   534	            const subscriptions = payload?.subscriptions
   535	            if (!Array.isArray(subscriptions)) {
   536	              const error = { code: 'INVALID_DATA', message: 'Subscriptions must be an array' }
   537	              if (callback) callback({ error })
   538	              return
   539	            }
   540	
   541	            const result = await handleRestoreSubscriptions({
   542	              socket,
   543	              subscriptions,
   544	              scopes,
   545	              runHooks,
   546	              log,
   547	              config
   548	            })
   549	
   550	            if (callback) callback({ success: true, ...result })
   551	          } catch (error) {
   552	            log.error('Socket.IO restore subscriptions error', error)
   553	            if (callback) {
   554	              callback({
   555	                error: {
   556	                  code: 'RESTORE_ERROR',
   557	                  message: error.message
   558	                }
   559	              })
   560	            }
   561	          }
   562	        })
   563	
   564	        socket.on('disconnect', (reason) => {
   565	          log.info(`Socket disconnected: ${socket.id}`, {
   566	            reason,
   567	            subscriptionCount: socket.data.subscriptions?.size || 0
   568	          })
   569	        })
   570	      })
   571	
   572	      log.info('Socket.IO server started', { path })
   573	      return io
   574	    }
   575	
   576	    addHook('finish', 'socketio-broadcast', {}, async ({ context }) => {
   577	      const { method, scopeName, id } = context
   578	
   579	      if (!io) {
   580	        log.debug('Socket.IO broadcast skipped: server not started')
   581	        return
   582	      }
   583	
   584	      if (!['post', 'put', 'patch', 'delete'].includes(method)) {
   585	        return
   586	      }
   587	
   588	      if (!id && method !== 'delete') {
   589	        log.debug('Socket.IO broadcast skipped: missing record id')
   590	        return
   591	      }
   592	
   593	      if (context.transaction) {
   594	        if (!pendingBroadcasts.has(context.transaction)) {
   595	          pendingBroadcasts.set(context.transaction, [])
   596	        }
   597	        pendingBroadcasts.get(context.transaction).push({ method, scopeName, id, context })
   598	        return
   599	      }
   600	
   601	      await performBroadcast({ method, scopeName, id, context, api, io, log })
   602	    })
   603	
   604	    addHook('afterCommit', 'socketio-broadcast-deferred', {}, async ({ context }) => {
   605	      if (!context?.transaction) return
   606	      const broadcasts = pendingBroadcasts.get(context.transaction)
   607	      if (!broadcasts) return
   608	
   609	      for (const broadcast of broadcasts) {
   610	        await performBroadcast({ ...broadcast, api, io, log })
   611	      }
   612	
   613	      pendingBroadcasts.delete(context.transaction)
   614	    })
   615	
   616	    addHook('afterRollback', 'socketio-cleanup-broadcasts', {}, async ({ context }) => {
   617	      if (!context?.transaction) return
   618	      pendingBroadcasts.delete(context.transaction)
   619	    })
   620	  }
   621	}
