fix socket connection

This commit is contained in:
Simon Larsen
2023-11-16 17:16:37 +00:00
parent 08d13997fe
commit 12df8275a5
19 changed files with 304 additions and 146 deletions

View File

@@ -9,7 +9,7 @@ import EnableWorkflowOn from '../Types/BaseDatabase/EnableWorkflowOn';
import ObjectID from '../Types/ObjectID';
import CommonModel from './CommonModel';
import Route from '../Types/API/Route';
import EnableRealtimeEventsOn from '../Utils/Realtime';
import {EnableRealtimeEventsOn} from '../Utils/Realtime';
export default class AnalyticsBaseModel extends CommonModel {
public constructor(data: {

View File

@@ -28,4 +28,4 @@ export const AdminDashboardRoute: Route = new Route('/admin');
export const IngestorRoute: Route = new Route('/ingestor');
export const RealtimeRoute: Route = new Route('/realtime');
export const RealtimeRoute: Route = new Route('/realtime/socket');

View File

@@ -1,5 +1,6 @@
import DatabaseType from '../Types/BaseDatabase/DatabaseType';
import { JSONObject } from '../Types/JSON';
import ObjectID from '../Types/ObjectID';
export enum EventName {
ListenToModalEvent = 'ListenToModelEvent',
@@ -20,9 +21,19 @@ export interface ListenToModelEventJSON {
select: JSONObject;
}
export default interface EnableRealtimeEventsOn {
export interface EnableRealtimeEventsOn {
create?: boolean | undefined;
update?: boolean | undefined;
delete?: boolean | undefined;
read?: boolean | undefined;
}
export default class RealtimeUtil {
public static getRoomId(
tenantId: string | ObjectID,
modelName: string,
eventType: ModelEventType
): string {
return tenantId.toString() + '-' + modelName + '-' + eventType;
}
}

View File

@@ -1,39 +1,55 @@
import SocketIO from 'socket.io';
import http from 'http';
import Express, { ExpressApplication } from '../Utils/Express';
import Express from '../Utils/Express';
import { createAdapter } from '@socket.io/redis-adapter';
import Redis, { ClientType } from './Redis';
import DatabaseNotConnectedException from 'Common/Types/Exception/DatabaseNotConnectedException';
const app: ExpressApplication = Express.getExpressApp();
const server: http.Server = http.createServer(app);
import { RealtimeRoute } from 'Common/ServiceRoute';
import logger from '../Utils/Logger';
// import { RealtimeRoute } from 'Common/ServiceRoute';
export type Socket = SocketIO.Socket;
export type SocketServer = SocketIO.Server;
const io: SocketIO.Server = new SocketIO.Server(server, {
path: '/realtime/socket',
transports: ['websocket', 'polling'], // Using websocket does not require sticky session
perMessageDeflate: {
threshold: 1024, // Defaults to 1024
zlibDeflateOptions: {
chunkSize: 16 * 1024, // Defaults to 16 * 1024
},
zlibInflateOptions: {
windowBits: 15, // Defaults to 15
memLevel: 8, // Defaults to 8
},
},
});
export default abstract class IO {
if (!Redis.getClient()) {
throw new DatabaseNotConnectedException(
'Redis is not connected. Please connect to Redis before connecting to SocketIO.'
);
private static socketServer: SocketIO.Server | null = null;
public static init() {
const server: http.Server = Express.getHttpServer();
this.socketServer = new SocketIO.Server(server, {
path: RealtimeRoute.toString()
});
this.socketServer.on("connection", (_socket) => {
logger.info("Client socket connected!");
});
if (!Redis.getClient()) {
throw new DatabaseNotConnectedException(
'Redis is not connected. Please connect to Redis before connecting to SocketIO.'
);
}
const pubClient: ClientType = Redis.getClient()!.duplicate();
const subClient: ClientType = Redis.getClient()!.duplicate();
this.socketServer.adapter(createAdapter(pubClient, subClient));
}
public static getSocketServer(): SocketIO.Server | null {
if (!this.socketServer) {
this.init();
}
return this.socketServer;
}
}
const pubClient: ClientType = Redis.getClient()!.duplicate();
const subClient: ClientType = Redis.getClient()!.duplicate();
io.adapter(createAdapter(pubClient, subClient));
export default io;

View File

@@ -53,14 +53,14 @@ import { ModelEventType } from 'Common/Utils/Realtime';
export default class AnalyticsDatabaseService<
TBaseModel extends AnalyticsBaseModel
> extends BaseService {
public modelType!: { new (): TBaseModel };
public modelType!: { new(): TBaseModel };
public database!: ClickhouseDatabase;
public model!: TBaseModel;
public databaseClient!: ClickhouseClient;
public statementGenerator!: StatementGenerator<TBaseModel>;
public constructor(data: {
modelType: { new (): TBaseModel };
modelType: { new(): TBaseModel };
database?: ClickhouseDatabase | undefined;
}) {
super();
@@ -272,12 +272,10 @@ export default class AnalyticsDatabaseService<
this.useDefaultDatabase();
}
let statement: string = `SELECT count() FROM ${
this.database.getDatasourceOptions().database
}.${this.model.tableName}
${
Object.keys(countBy.query).length > 0 ? 'WHERE' : ''
} ${this.statementGenerator.toWhereStatement(countBy.query)}
let statement: string = `SELECT count() FROM ${this.database.getDatasourceOptions().database
}.${this.model.tableName}
${Object.keys(countBy.query).length > 0 ? 'WHERE' : ''
} ${this.statementGenerator.toWhereStatement(countBy.query)}
`;
if (countBy.limit) {
@@ -308,12 +306,10 @@ export default class AnalyticsDatabaseService<
const select: { statement: string; columns: Array<string> } =
this.statementGenerator.toSelectStatement(findBy.select!);
const statement: string = `SELECT ${select.statement} FROM ${
this.database.getDatasourceOptions().database
}.${this.model.tableName}
${
Object.keys(findBy.query).length > 0 ? 'WHERE' : ''
} ${this.statementGenerator.toWhereStatement(findBy.query)}
const statement: string = `SELECT ${select.statement} FROM ${this.database.getDatasourceOptions().database
}.${this.model.tableName}
${Object.keys(findBy.query).length > 0 ? 'WHERE' : ''
} ${this.statementGenerator.toWhereStatement(findBy.query)}
ORDER BY ${this.statementGenerator.toSortStatemennt(findBy.sort!)}
LIMIT ${findBy.limit.toString()}
OFFSET ${findBy.skip.toString()}
@@ -330,11 +326,9 @@ export default class AnalyticsDatabaseService<
this.useDefaultDatabase();
}
let statement: string = `ALTER TABLE ${
this.database.getDatasourceOptions().database
}.${this.model.tableName}
DELETE ${
Object.keys(deleteBy.query).length > 0 ? 'WHERE' : 'WHERE 1=1'
let statement: string = `ALTER TABLE ${this.database.getDatasourceOptions().database
}.${this.model.tableName}
DELETE ${Object.keys(deleteBy.query).length > 0 ? 'WHERE' : 'WHERE 1=1'
} ${this.statementGenerator.toWhereStatement(deleteBy.query)}
`;
@@ -604,16 +598,16 @@ export default class AnalyticsDatabaseService<
const onCreate: OnCreate<TBaseModel> = createBy.props.ignoreHooks
? {
createBy: {
data: data,
props: createBy.props,
},
carryForward: [],
}
createBy: {
data: data,
props: createBy.props,
},
carryForward: [],
}
: await this._onBeforeCreate({
data: data,
props: createBy.props,
});
data: data,
props: createBy.props,
});
data = onCreate.createBy.data;
@@ -686,25 +680,33 @@ export default class AnalyticsDatabaseService<
}
}
// emit realtime events to the client.
if (
this.getModel().enableRealtimeEventsOn?.create &&
createBy.props.tenantId
) {
const promises: Array<Promise<void>> = [];
for (const item of items) {
promises.push(
Realtime.emitModelEvent({
model: item,
tenantId: createBy.props.tenantId,
eventType: ModelEventType.Create,
modelType: this.modelType,
})
);
if (Realtime.isInitialized()) {
const promises: Array<Promise<void>> = [];
for (const item of items) {
promises.push(
Realtime.emitModelEvent({
model: item,
tenantId: createBy.props.tenantId,
eventType: ModelEventType.Create,
modelType: this.modelType,
})
);
}
await Promise.allSettled(promises);
} else {
logger.warn(`Realtime is not initialized. Skipping emitModelEvent for ${this.getModel().tableName}`)
}
await Promise.allSettled(promises);
}
return createBy.items;

View File

@@ -11,6 +11,7 @@ import {
import UserType from 'Common/Types/UserType';
import Dictionary from 'Common/Types/Dictionary';
import Port from 'Common/Types/Port';
import { Server, createServer } from "http";
export type RequestHandler = express.RequestHandler;
export type NextFunction = express.NextFunction;
@@ -47,6 +48,7 @@ export interface OneUptimeResponse extends express.Response {
class Express {
private static app: express.Application;
private static httpServer: Server;
public static getRouter(): express.Router {
return express.Router();
@@ -56,6 +58,10 @@ class Express {
this.app = express();
}
public static getHttpServer(): Server {
return this.httpServer;
}
public static getExpressApp(): express.Application {
if (!this.app) {
this.setupExpress();
@@ -66,14 +72,19 @@ class Express {
public static async launchApplication(
appName: string,
port?: Port
port?: Port,
): Promise<express.Application> {
if (!this.app) {
this.setupExpress();
}
if(!this.httpServer){
this.httpServer = createServer(this.app);
}
return new Promise<express.Application>((resolve: Function) => {
this.app.listen(port?.toNumber() || this.app.get('port'), () => {
this.httpServer.listen(port?.toNumber() || this.app.get('port'), () => {
// eslint-disable-next-line
logger.info(`${appName} server started on port: ${port?.toNumber() || this.app.get('port')}`);
return resolve(this.app);

View File

@@ -1,34 +1,101 @@
import { JSONObject } from 'Common/Types/JSON';
import io from '../Infrastructure/SocketIO';
import {
import IO, { SocketServer, Socket } from '../Infrastructure/SocketIO';
import RealtimeUtil, {
EventName,
ListenToModelEventJSON,
ModelEventType,
} from 'Common/Utils/Realtime';
import { Socket } from 'socket.io';
import DatabaseType from 'Common/Types/BaseDatabase/DatabaseType';
import JSONFunctions from 'Common/Types/JSONFunctions';
import BadDataException from 'Common/Types/Exception/BadDataException';
import ObjectID from 'Common/Types/ObjectID';
import BaseModel from 'Common/Models/BaseModel';
import AnalyticsBaseModel from 'Common/AnalyticsModels/BaseModel';
import logger from './Logger';
export default class Realtime {
public static getRoomId(
tenantId: string | ObjectID,
modelName: string,
eventType: ModelEventType
): string {
return tenantId.toString() + '-' + modelName + '-' + eventType;
export default abstract class Realtime {
private static socketServer: SocketServer | null = null;
public static isInitialized(): boolean {
return this.socketServer !== null;
}
public static init(){
if(!this.socketServer){
this.socketServer = IO.getSocketServer();
logger.info('Realtime socket server initialized');
}
this.socketServer!.on('connection', (socket: Socket) => {
logger.info('Client socket connected!');
debugger;
socket.on(EventName.ListenToModalEvent, async (data: JSONObject) => {
debugger;
// TODO: validate if this soocket has access to this tenant
// TODO: validate if this socket has access to this model
// TODO: validate if this socket has access to this event type
// TODO: validate if this socket has access to this query
// TODO: validate if this socket has access to this select
// validate data
if (typeof data['eventType'] !== 'string') {
throw new BadDataException('eventType is not a string');
}
if (typeof data['modelType'] !== 'string') {
throw new BadDataException('modelType is not a string');
}
if (typeof data['modelName'] !== 'string') {
throw new BadDataException('modelName is not a string');
}
if (typeof data['query'] !== 'object') {
throw new BadDataException('query is not an object');
}
if (typeof data['tenantId'] !== 'string') {
throw new BadDataException('tenantId is not a string');
}
if (typeof data['select'] !== 'object') {
throw new BadDataException('select is not an object');
}
await Realtime.listenToModelEvent(socket, {
eventType: data['eventType'] as ModelEventType,
modelType: data['modelType'] as DatabaseType,
modelName: data['modelName'] as string,
query: JSONFunctions.deserialize(data['query'] as JSONObject),
tenantId: data['tenantId'] as string,
select: JSONFunctions.deserialize(data['select'] as JSONObject),
});
});
});
return this.socketServer;
}
public static async listenToModelEvent(
socket: Socket,
data: ListenToModelEventJSON
): Promise<void> {
if(!this.socketServer){
this.init();
}
// join the room.
await socket.join(
this.getRoomId(data.tenantId, data.modelName, data.eventType)
RealtimeUtil.getRoomId(data.tenantId, data.modelName, data.eventType)
);
}
@@ -36,9 +103,15 @@ export default class Realtime {
socket: Socket,
data: ListenToModelEventJSON
): Promise<void> {
if(!this.socketServer){
this.init();
}
// leave this room.
await socket.leave(
this.getRoomId(data.tenantId, data.modelName, data.eventType)
RealtimeUtil.getRoomId(data.tenantId, data.modelName, data.eventType)
);
}
@@ -48,6 +121,11 @@ export default class Realtime {
model: BaseModel | AnalyticsBaseModel;
modelType: { new (): BaseModel | AnalyticsBaseModel };
}): Promise<void> {
if(!this.socketServer){
this.init();
}
let jsonObject: JSONObject = {};
if (data.model instanceof BaseModel) {
@@ -64,10 +142,10 @@ export default class Realtime {
);
}
io.to(
this.getRoomId(data.tenantId, data.model.tableName!, data.eventType)
this.socketServer!.to(
RealtimeUtil.getRoomId(data.tenantId, data.model.tableName!, data.eventType)
).emit(
this.getRoomId(
RealtimeUtil.getRoomId(
data.tenantId,
data.model.tableName!,
data.eventType
@@ -77,46 +155,4 @@ export default class Realtime {
}
}
io.on('connection', (socket: Socket) => {
socket.on(EventName.ListenToModalEvent, async (data: JSONObject) => {
// TODO: validate if this soocket has access to this tenant
// TODO: validate if this socket has access to this model
// TODO: validate if this socket has access to this event type
// TODO: validate if this socket has access to this query
// TODO: validate if this socket has access to this select
// validate data
if (typeof data['eventType'] !== 'string') {
throw new BadDataException('eventType is not a string');
}
if (typeof data['modelType'] !== 'string') {
throw new BadDataException('modelType is not a string');
}
if (typeof data['modelName'] !== 'string') {
throw new BadDataException('modelName is not a string');
}
if (typeof data['query'] !== 'object') {
throw new BadDataException('query is not an object');
}
if (typeof data['tenantId'] !== 'string') {
throw new BadDataException('tenantId is not a string');
}
if (typeof data['select'] !== 'object') {
throw new BadDataException('select is not an object');
}
await Realtime.listenToModelEvent(socket, {
eventType: data['eventType'] as ModelEventType,
modelType: data['modelType'] as DatabaseType,
modelName: data['modelName'] as string,
query: JSONFunctions.deserialize(data['query'] as JSONObject),
tenantId: data['tenantId'] as string,
select: JSONFunctions.deserialize(data['select'] as JSONObject),
});
});
});

View File

@@ -43,7 +43,7 @@
"nodemailer": "^6.7.3",
"nodemailer-express-handlebars": "^5.0.0",
"pg": "^8.7.3",
"socket.io": "^4.4.1",
"socket.io": "^4.7.2",
"stripe": "^10.17.0",
"twilio": "^4.13.0",
"typeorm": "^0.3.10",

View File

@@ -32,7 +32,9 @@ const LogItem: FunctionComponent<ComponentProps> = (
}}
>
{props.log.time && (
<div className="text-slate-500 courier-prime">
<div className="text-slate-500 courier-prime" style={{
width: "230px !important"
}}>
{OneUptimeDate.getDateAsLocalFormattedString(
props.log.time
)}{' '}
@@ -51,10 +53,10 @@ const LogItem: FunctionComponent<ComponentProps> = (
)}
{props.log.severityText === 'Error' && (
<div className="text-rose-400 courier-prime">
[ERROR] &nbsp;
[ERRR] &nbsp;
</div>
)}
<div className={`${bodyColor} courier-prime`}>
<div className={`${bodyColor} courier-prime truncate`}>
{props.log.body?.toString()}
</div>
</div>
@@ -100,7 +102,7 @@ const LogItem: FunctionComponent<ComponentProps> = (
SEVERITY:
</div>
<div className="text-rose-400 courier-prime">
[ERROR] &nbsp;
[ERRR] &nbsp;
</div>
</div>
)}

View File

@@ -2,7 +2,7 @@ import AnalyticsBaseModel from 'Common/AnalyticsModels/BaseModel';
import BaseModel from 'Common/Models/BaseModel';
import AnalyticsQuery from './AnalyticsModelAPI/Query';
import Query from './ModelAPI/Query';
import {
import RealtimeUtil, {
EventName,
ListenToModelEventJSON,
ModelEventType,
@@ -15,6 +15,8 @@ import JSONFunctions from 'Common/Types/JSONFunctions';
import DatabaseType from 'Common/Types/BaseDatabase/DatabaseType';
import AnalyticsSelect from './AnalyticsModelAPI/Select';
import Select from './ModelAPI/Select';
import { JSONObject } from 'Common/Types/JSON';
import { RealtimeRoute } from 'Common/ServiceRoute';
export interface ListenToAnalyticsModelEvent<Model extends AnalyticsBaseModel> {
modelType: { new (): Model };
@@ -32,23 +34,31 @@ export interface ListenToModelEvent<Model extends BaseModel> {
select: Select<Model>;
}
export default class Reatime {
private socket!: Socket;
export default abstract class Reatime {
private static socket: Socket;
public static init() {
const url: string = REALTIME_URL.toString();
public constructor() {
const socket: Socket = SocketIO(
URL.fromString(REALTIME_URL.toString())
.addRoute('/socket')
.toString()
URL.fromString(url)
.toString(), {
path: RealtimeRoute.toString()
}
);
this.socket = socket;
}
public listenToModelEvent<Model extends BaseModel>(
listenToModelEvent: ListenToModelEvent<Model>
): void {
public static listenToModelEvent<Model extends BaseModel>(
listenToModelEvent: ListenToModelEvent<Model>, onEvent: (model: Model) => void
): () => void {
// conver this to json and send it to the server.
if(!this.socket){
this.init();
}
const listenToModelEventJSON: ListenToModelEventJSON = {
eventType: listenToModelEvent.eventType,
modelType: DatabaseType.Database,
@@ -59,11 +69,28 @@ export default class Reatime {
};
this.socket.emit(EventName.ListenToModalEvent, listenToModelEventJSON);
this.socket.on(RealtimeUtil.getRoomId(listenToModelEvent.tenantId, listenToModelEvent.modelType.name, listenToModelEvent.eventType), (model: JSONObject) => {
onEvent(BaseModel.fromJSON(model, listenToModelEvent.modelType) as Model);
});
// Stop listening to the event.
const stopListening = (): void => {
this.socket.off(RealtimeUtil.getRoomId(listenToModelEvent.tenantId, listenToModelEvent.modelType.name, listenToModelEvent.eventType));
}
return stopListening;
}
public listenToAnalyticsModelEvent<Model extends AnalyticsBaseModel>(
listenToModelEvent: ListenToAnalyticsModelEvent<Model>
): void {
public static listenToAnalyticsModelEvent<Model extends AnalyticsBaseModel>(
listenToModelEvent: ListenToAnalyticsModelEvent<Model>, onEvent: (model: Model) => void
): () => void {
if(!this.socket){
this.init();
}
const listenToModelEventJSON: ListenToModelEventJSON = {
eventType: listenToModelEvent.eventType,
modelType: DatabaseType.AnalyticsDatabase,
@@ -74,5 +101,18 @@ export default class Reatime {
};
this.socket.emit(EventName.ListenToModalEvent, listenToModelEventJSON);
this.socket.on(RealtimeUtil.getRoomId(listenToModelEvent.tenantId, listenToModelEvent.modelType.name, listenToModelEvent.eventType), (model: JSONObject) => {
onEvent(AnalyticsBaseModel.fromJSON(model, listenToModelEvent.modelType) as Model);
});
// Stop listening to the event.
const stopListening = (): void => {
this.socket.off(RealtimeUtil.getRoomId(listenToModelEvent.tenantId, listenToModelEvent.modelType.name, listenToModelEvent.eventType));
}
return stopListening;
}
}

View File

@@ -10,6 +10,9 @@ import API from 'CommonUI/src/Utils/API/API';
import ObjectID from 'Common/Types/ObjectID';
import { LIMIT_PER_PROJECT } from 'Common/Types/Database/LimitMax';
import SortOrder from 'Common/Types/BaseDatabase/SortOrder';
import Realtime from 'CommonUI/src/Utils/Realtime';
import { ModelEventType } from 'Common/Utils/Realtime';
import ProjectUtil from 'CommonUI/src/Utils/Project';
export interface ComponentProps {
id: string;
@@ -23,6 +26,8 @@ const DashboardLogsViewer: FunctionComponent<ComponentProps> = (
const [error, setError] = React.useState<string>('');
const [isLoading, setIsLoading] = React.useState<boolean>(false);
const fetchItems: Function = async () => {
setError('');
setIsLoading(true);
@@ -66,6 +71,30 @@ const DashboardLogsViewer: FunctionComponent<ComponentProps> = (
fetchItems().catch((err: unknown) => {
setError(API.getFriendlyMessage(err));
});
const disconnectFunction = Realtime.listenToAnalyticsModelEvent({
modelType: Log,
query: {},
eventType: ModelEventType.Create,
tenantId: ProjectUtil.getCurrentProjectId()!,
select: {
body: true,
time: true,
projectId: true,
serviceId: true,
spanId: true,
traceId: true,
severityText: true,
}
}, (model: Log)=>{
setLogs((logs) => [...logs, model]);
})
return () => {
disconnectFunction();
}
}, []);
if (error) {

View File

@@ -405,6 +405,7 @@ import Log from 'Model/AnalyticsModels/Log';
import LogService, {
LogService as LogServiceType,
} from 'CommonServer/Services/LogService';
import Realtime from 'CommonServer/Utils/Realtime';
const app: ExpressApplication = Express.getExpressApp();
@@ -1082,6 +1083,9 @@ const init: () => Promise<void> = async (): Promise<void> => {
await ClickhouseAppInstance.connect(
ClickhouseAppInstance.getDatasourceOptions()
);
Realtime.init();
} catch (err) {
logger.error('App Init Failed:');
logger.error(err);

View File

@@ -118,6 +118,11 @@ async Task<String> SendGreeting(ILogger<Program> logger)
// Log a message
logger.LogInformation("Sending greeting");
logger.LogError("Error sending greeting");
logger.LogWarning("Warning sending greeting");
// very big log message
logger.LogInformation("LONG LOG: sdsfdg dfgdfgdfg dfgdfgfdgdfg dfgdfgdfg fdgfdgdf fdjgk gkdfjgf dfkgjdfkgjdfkjgkdfjgk gdkfjgkdfjgkdfj gjdfkjgkdfjgkdfjgk fdjgkdfjgkdfjgkjdfkgj fdkgjfdkgjdfkgjkdfg dfkgjdfkjgkfdjgkfjkgdfjkg fdkgjkfdgjkdfjgkjdkg fdkgjdfkjgk");
// Increment the custom counter
countGreetings.Add(1);

View File

@@ -5,5 +5,5 @@
Please use
```bash
export OTEL_EXPORTER_OTLP_HEADERS=x-oneuptime-service-token=0a00ebc0-7f39-11ee-ac8c-3fb43926b224 && dotnet run --urls=http://localhost:7856/
export OTEL_EXPORTER_OTLP_HEADERS=x-oneuptime-service-token=2310ba80-821f-11ee-89ce-49d0fde7878f && dotnet run --urls=http://localhost:7856/
```

View File

@@ -29,3 +29,5 @@
2.0
2.0
2.0
2.0
2.0