feat: add Clickhouse configuration volume and update AnalyticsDatabaseService to use ResultSet for JSON responses

This commit is contained in:
Simon Larsen
2025-01-30 11:58:43 +00:00
parent 81798211ea
commit 943acc8567
3 changed files with 1813 additions and 26 deletions

1778
Clickhouse/config.xml Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -29,7 +29,7 @@ import logger from "../Utils/Logger";
import Realtime from "../Utils/Realtime";
import StreamUtil from "../Utils/Stream";
import BaseService from "./BaseService";
import { ExecResult } from "@clickhouse/client";
import { ExecResult, ResponseJSON, ResultSet } from "@clickhouse/client";
import AnalyticsBaseModel from "Common/Models/AnalyticsModels/AnalyticsBaseModel/AnalyticsBaseModel";
import { WorkflowRoute } from "Common/ServiceRoute";
import Protocol from "../../Types/API/Protocol";
@@ -148,19 +148,23 @@ export default class AnalyticsDatabaseService<
const countStatement: Statement = this.toCountStatement(countBy);
const dbResult: ExecResult<Stream> = await this.execute(countStatement);
const strResult: string = await StreamUtil.convertStreamToText(
dbResult.stream,
const dbResult: ResultSet<"JSON"> = await this.executeQuery(
countStatement,
);
let countPositive: PositiveNumber = new PositiveNumber(strResult || 0);
logger.debug(`${this.model.tableName} Count Statement executed`);
logger.debug(countStatement);
if (countBy.groupBy && Object.keys(countBy.groupBy).length > 0) {
// this usually happens when group by is used. In this case we count the total number of groups and not rows in those groups.
countPositive = new PositiveNumber(strResult.split("\n").length - 1); // -1 because the last line is empty.
const resultInJSON: ResponseJSON<JSONObject> = (await dbResult.json<JSONObject>());
let countPositive: PositiveNumber = new PositiveNumber(0);
if (resultInJSON.data && resultInJSON.data[0] && resultInJSON.data[0]["count()"] && typeof resultInJSON.data[0]["count()"] === "string") {
countPositive = new PositiveNumber(resultInJSON.data[0]["count()"] as string);
}
logger.debug(`Result: `);
logger.debug(countPositive.toNumber());
countPositive = await this.onCountSuccess(countPositive);
return countPositive;
} catch (error) {
@@ -314,16 +318,6 @@ export default class AnalyticsDatabaseService<
}
}
public async executeQuery(query: string): Promise<string> {
const dbResult: ExecResult<Stream> = await this.execute(query);
const strResult: string = await StreamUtil.convertStreamToText(
dbResult.stream,
);
return strResult;
}
private async _findBy(
findBy: FindBy<TBaseModel>,
): Promise<Array<TBaseModel>> {
@@ -379,21 +373,16 @@ export default class AnalyticsDatabaseService<
columns: Array<string>;
} = this.toFindStatement(onBeforeFind);
const dbResult: ExecResult<Stream> = await this.execute(
const dbResult: ResultSet<"JSON"> = await this.executeQuery(
findStatement.statement,
);
logger.debug(`${this.model.tableName} Find Statement executed`);
logger.debug(findStatement.statement);
const strResult: string = await StreamUtil.convertStreamToText(
dbResult.stream,
);
const responseJSON: ResponseJSON<JSONObject> = (await dbResult.json<JSONObject>())
const jsonItems: Array<JSONObject> = this.convertSelectReturnedDataToJson(
strResult,
findStatement.columns,
);
const jsonItems: Array<JSONObject> = responseJSON.data;
let items: Array<TBaseModel> =
AnalyticsBaseModel.fromJSONArray<TBaseModel>(jsonItems, this.modelType);
@@ -815,6 +804,24 @@ export default class AnalyticsDatabaseService<
) as ExecResult<Stream>;
}
public async executeQuery(
statement: Statement | string,
): Promise<ResultSet<"JSON">> {
if (!this.databaseClient) {
this.useDefaultDatabase();
}
const query: string = statement instanceof Statement ? statement.query : statement;
const queryParams: Record<string, unknown> | undefined = statement instanceof Statement ? statement.query_params : undefined;
return await this.databaseClient.query({
query: query,
format: "JSON",
query_params: queryParams || undefined as any, // undefined is not specified in the type for query_params, but its ok to pass undefined.
})
}
protected async onUpdateSuccess(
onUpdate: OnUpdate<TBaseModel>,
_updatedItemIds: Array<ObjectID>,

View File

@@ -26,6 +26,8 @@ services:
extends:
file: ./docker-compose.base.yml
service: clickhouse
volumes:
- ./Clickhouse/config.xml:/etc/clickhouse-server/config.xml
postgres:
ports: