You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

447 lines
13 KiB

import WebSocket from 'ws';
import {v4 as uuid} from 'uuid';
import {Session, ISessionSessionMessageEvent, ISessionUserMessageEvent, ISessionRoomJoinedEvent, ISessionRoomLeftEvent} from './Session';
import {BackendApiRequestHelper} from './BackendApiRequestHelper';
import {RoomApiHandler, RoomApiHandlerRoomListEvent, IRoomApiHandlerParticipantsListEvent, IRoomApiHandlerRoomDeletedEvent, IRoomApiHandlerRoomMessageEvent} from './RoomApiHandler';
import {AbstractWebRtcServerHelper} from './AbstractWebRtcServerHelper';
import {RoomEventType, EventData, ParticipantsListEventData, ParticipantsListEventType, RoomEventData} from '../types/Events';
export class SessionManager {
private sessions: Array<Session>;
private webRtcServerHelper?: AbstractWebRtcServerHelper;
private roomApiHandler: RoomApiHandler;
private backendApiRequestHelper: BackendApiRequestHelper;
private onSocketMessage?: (message: string) => void;
constructor({ webSocketServer, webRtcServerHelper, roomApiHandler, backendApiRequestHelper }: IConstructorServices) {
this.sessions = [];
this.webRtcServerHelper = webRtcServerHelper;
this.roomApiHandler = roomApiHandler;
this.backendApiRequestHelper = backendApiRequestHelper;
webSocketServer.on('connection', this.onSocketConnection.bind(this));
this.roomApiHandler.on('roomListEvent', this.onRoomListEvent.bind(this));
this.roomApiHandler.on('participantsListEvent', this.onParticipantsListEvent.bind(this));
this.roomApiHandler.on('roomMessage', this.onRoomMessage.bind(this));
this.roomApiHandler.on('roomDeleted', this.onRoomDeleted.bind(this));
}
public getSessionById(sessionId: string): Session|null {
return this.sessions.find(session => session.getId() === sessionId) || null;
}
public getSessionsByIds(sessionIds: Array<string>): Array<Session> {
return this.sessions.filter(session => sessionIds.includes(session.getId()));
}
public getSessionByResumeId(resumeId: string): Session|null {
return this.sessions.find(session => session.getResumeId() === resumeId) || null;
}
public getSessionByNcSessionId(ncSessionId: string): Session|null {
return this.sessions.find(session => session.getNcSessionId() === ncSessionId) || null;
}
public getSessionsForRoomId(roomId: string): Array<Session> {
return this.sessions.filter(session => session.getRoomId() === roomId);
}
public getSessionsForUserId(userId: string): Array<Session> {
return this.sessions.filter(session => session.getUserId() === userId);
}
public getSessionsForUserIds(userIds: Array<string>): Array<Session> {
return this.sessions.filter(session => userIds.includes(session.getUserId() || ''));
}
private createSession(sessionCreationData: ISessionCreationData): Session {
const newSession = new Session({
sessionId: this.getNewSessionId(),
socket: sessionCreationData.socket
}, {
backendApiRequestHelper: sessionCreationData.backendApiRequestHelper
});
if(sessionCreationData.ncUserId) {
newSession.setUserId(sessionCreationData.ncUserId);
}
if(this.webRtcServerHelper) {
this.webRtcServerHelper.createSession(newSession.getId());
}
newSession.on('socketClosed', () => {
this.log('destroy session', newSession.getId());
this.deleteSession(newSession);
});
newSession.on('sessionMessage', this.onSessionMessage.bind(this));
newSession.on('userMessage', this.onUserMessage.bind(this));
newSession.on('roomMessage', this.onRoomMessage.bind(this));
newSession.on('roomJoined', this.onRoomJoined.bind(this, newSession));
newSession.on('roomLeft', this.onRoomLeft.bind(this, newSession));
this.sessions.push(newSession);
return newSession;
}
private getNewSessionId(): string {
return uuid();
}
public deleteSession(session: Session): void {
this.deleteSessionById(session.getId());
}
public deleteSessionById(sessionId: string): void {
const index = this.sessions.findIndex(session => session.getId() === sessionId);
if(index >= 0) {
const sessionToDelete = this.sessions[index];
this.webRtcServerHelper?.destroySession(sessionToDelete.getId());
this.sessions.splice(index, 1);
}
}
private onSocketConnection(socket: WebSocket): void {
this.log('socket connected...');
this.onSocketMessage = (message: string): void => {
this.handleSocketMessage(socket, message);
};
socket.on('message', this.onSocketMessage);
}
private removeListenerOfSocket(socket: WebSocket): void {
if(this.onSocketMessage) socket.off('message', this.onSocketMessage);
}
private handleSocketMessage(socket: WebSocket, message: string): void {
const data: Message = JSON.parse(message);
if(data.type !== 'hello') {
this.log('no hello message received yet, ignoring...', data);
return;
}
this.handleHelloMessage(socket, data);
}
private async handleHelloMessage(socket: WebSocket, data: IHelloMessage): Promise<void> {
this.log('HELLO:', data);
if('resumeid' in data.hello) {
this.log('RESUMING SESSION...');
const session = this.getSessionByResumeId(data.hello.resumeid);
if(!session) {
this.sendErrorMessage(socket, { code: 'no_such_session' }, data.id);
return;
}
session.resumeSession(socket);
session.sendSocketMessage({
id: data.id,
type: 'hello',
hello: {
version: '1.0',
sessionid: session.getId(),
resumeid: session.getResumeId()
}
});
this.removeListenerOfSocket(socket);
return;
}
this.backendApiRequestHelper.setUrl(data.hello.auth.url);
const backendResponse = await this.backendApiRequestHelper.sendAuthRequest({
version: '1.0',
...data.hello.auth
});
const userId = backendResponse.userid;
const session = this.createSession({
socket: socket,
ncUserId: userId,
backendApiRequestHelper: this.backendApiRequestHelper,
webRtcServerHelper: this.webRtcServerHelper
});
const features = [];
if(this.webRtcServerHelper) features.push('mcu');
const response: IHelloMessageResponse = {
id: data.id,
type: 'hello',
hello: {
sessionid: session.getId(),
resumeid: session.getResumeId(),
userid: userId,
version: '1.0',
server: {
version: process.env.VERSION || 'unknown',
features: features
}
}
};
session.sendSocketMessage(response);
this.removeListenerOfSocket(socket);
}
private sendErrorMessage(socket: WebSocket, error: any, messageId?: string): void {
const errorMessageData = {
id: messageId,
type: 'error',
error: error
};
this.sendSocketMessage(socket, errorMessageData);
}
private sendSocketMessage(socket: WebSocket, data: Object): void {
socket.send(JSON.stringify(data));
}
private onRoomListEvent(data: RoomApiHandlerRoomListEvent): void {
const sessions = 'sessionIds' in data ? this.getSessionsByIds(data.sessionIds) : this.getSessionsForUserIds(data.userIds);
sessions.forEach(session => {
this.sendEventMessage(session, data.eventData);
});
}
private onParticipantsListEvent(data: IRoomApiHandlerParticipantsListEvent): void {
const changedParticipants = data.eventData.update.users.map(item => {
const session = this.getSessionByNcSessionId(item.sessionId);
if(!session) {
return item;
}
const sessionId = session.getId();
return {
...item,
sessionId: sessionId
};
});
const sendData: ParticipantsListEventData = {
target: 'participants',
type: ParticipantsListEventType.UPDATE,
update: {
...data.eventData.update,
users: changedParticipants
}
};
const sessions = this.getSessionsForRoomId(data.eventData.update.roomid);
sessions.forEach(session => {
this.sendEventMessage(session, sendData);
});
changedParticipants.forEach(participant => {
if(!participant.inCall) this.webRtcServerHelper?.unpublish(participant.sessionId);
});
}
private sendRoomEventToRoomSessions(roomId: string, eventData: RoomEventData): void {
const roomSessions = this.getSessionsForRoomId(roomId);
roomSessions.forEach((session) => {
this.sendEventMessage(session, eventData);
});
}
private sendEventMessage(session: Session, eventData: EventData): void {
const sendData = {
type: 'event',
event: eventData
};
session.sendSocketMessage(sendData);
}
private async onSessionMessage(data: ISessionSessionMessageEvent): Promise<void> {
this.log('MESSAGE:', data);
if(data.recipient.type !== 'session') return; // TODO check better?
const session = this.getSessionById(data.recipient.sessionid) || this.getSessionByNcSessionId(recipient.sessionid);
if(!session) return; // TODO Error handling?
const senderSession = this.getSessionById(data.senderSessionId);
const payload = data.data.payload;
switch(data.data.type) {
case 'offer':
senderSession.sendMessage(data.recipient, {
type: 'answer',
roomType: data.data.roomType,
payload: {
type: 'answer',
sdp: await this.handleOffer(session, payload),
nick: payload.nick
}
});
break;
case 'candidate':
await this.handleCandidate(session, payload.candidate);
break;
case 'requestoffer':
senderSession.sendMessage(data.recipient, {
type: 'offer',
from: data.data.to,
roomType: 'video',
payload: {
type: 'offer',
sdp: await this.handleOfferRequest(session, data.senderSessionId),
// nick: payload.nick
}
});
break;
case 'answer':
await this.handleAnswer(session, data.senderSessionId, payload);
break;
default:
}
}
private async handleOffer(session: Session, payload: any): Promise<string|null> {
const roomId = session.getRoomId();
if(!roomId) {
console.error('we are not joined yet!');
return null;
}
return this.webRtcServerHelper?.publishInRoom(session.getId(), roomId, payload.sdp) || null;
}
private async handleCandidate(session: Session, candidate: any): Promise<void> {
return await this.webRtcServerHelper?.trickleCandidate(session.getId(), candidate);
}
private async handleOfferRequest(session: Session, senderSessionId: string): Promise<string|null> {
const roomId = session.getRoomId();
if(!roomId) {
console.error('we are not joined yet!');
return null;
}
return await this.webRtcServerHelper?.subscribeToFeedOfRoom(senderSessionId, roomId, session.getId()) || null;
}
private async handleAnswer(session: Session, senderSessionId: string, payload: any): Promise<void> {
await this.webRtcServerHelper?.setAnswerToSubscription(senderSessionId, session.getId(), payload.sdp);
}
private onUserMessage(data: ISessionUserMessageEvent): void {
const sessions = this.getSessionsForUserId(data.userId);
sessions.forEach((session) => {
session.sendSocketMessage(data.data);
});
}
private onRoomMessage(data: IRoomApiHandlerRoomMessageEvent): void {
const sessions = this.getSessionsForRoomId(data.roomId);
sessions.forEach((session) => {
session.sendSocketMessage(data.data);
});
}
private onRoomJoined(session: Session, data: ISessionRoomJoinedEvent): void {
this.sendRoomEventToRoomSessions(data.roomId, {
target: 'room',
type: RoomEventType.JOIN,
join: [ { sessionid: session.getId() } ]
});
this.webRtcServerHelper?.joinRoom(session.getId(), data.roomId);
}
private onRoomLeft(session: Session, data: ISessionRoomLeftEvent): void {
this.sendRoomEventToRoomSessions(data.roomId, {
target: 'room',
type: RoomEventType.LEAVE,
leave: [ session.getId() ]
});
this.webRtcServerHelper?.leaveRoom(session.getId());
}
private onRoomDeleted(data: IRoomApiHandlerRoomDeletedEvent): void {
this.webRtcServerHelper?.deleteRoom(data.roomId);
}
private log(message: string, data?: any): void {
console.log(`SESSION MANAGER: ${message}`, data || '');
}
}
interface IConstructorServices {
webSocketServer: WebSocket.Server,
webRtcServerHelper?: AbstractWebRtcServerHelper,
roomApiHandler: RoomApiHandler,
backendApiRequestHelper: BackendApiRequestHelper
}
interface IHelloMessage {
id: string,
type: 'hello',
hello: HelloMessageData
}
type HelloMessageData = IResumeHelloMessageData | IAuthHelloMessageData;
interface IResumeHelloMessageData {
version: '1.0',
resumeid: string
}
interface IAuthHelloMessageData {
version: '1.0',
auth: IAuthMessageData
}
interface IAuthMessageData {
url: string,
params: IAuthParams
}
interface IHelloMessageResponse {
id: string,
type: 'hello',
hello: IHelloMessageResponseData
}
interface IHelloMessageResponseData {
sessionid: string,
resumeid: string,
userid?: string,
version: '1.0',
server: {
version: string,
features: Array<string>
}
}
interface ISessionCreationData {
ncUserId?: string,
socket: WebSocket,
backendApiRequestHelper: BackendApiRequestHelper,
webRtcServerHelper?: AbstractWebRtcServerHelper
}