This commit is contained in:
Simon Larsen
2023-03-01 09:27:18 +00:00
parent adc85571b5
commit 4ca0d20231
22 changed files with 125 additions and 142 deletions

View File

@@ -3,7 +3,7 @@ enum ComponentID {
Log = 'log',
Schedule = 'schedule',
JavaScriptCode = 'javascript',
Manual = 'manual'
Manual = 'manual',
}
export default ComponentID;

View File

@@ -21,19 +21,18 @@ export default class Queue {
public static async removeJob(
queueName: QueueName,
jobId: string,
jobId: string
): Promise<void> {
let job: Job | undefined = await this.getQueue(queueName).getJob(jobId);
const job: Job | undefined = await this.getQueue(queueName).getJob(
jobId
);
if (job) {
await job.remove();
}
// remove existing repeatable job
await this.getQueue(queueName).removeRepeatableByKey(jobId)
await this.getQueue(queueName).removeRepeatableByKey(jobId);
}
public static async addJob(
@@ -56,7 +55,9 @@ export default class Queue {
};
}
let job: Job | undefined = await this.getQueue(queueName).getJob(jobId);
const job: Job | undefined = await this.getQueue(queueName).getJob(
jobId
);
if (job) {
await job.remove();
@@ -64,10 +65,16 @@ export default class Queue {
if (options?.repeatableKey) {
// remove existing repeatable job
await this.getQueue(queueName).removeRepeatableByKey(options?.repeatableKey)
await this.getQueue(queueName).removeRepeatableByKey(
options?.repeatableKey
);
}
const jobAdded = await this.getQueue(queueName).add(jobName, data, optionsObject);
const jobAdded: Job = await this.getQueue(queueName).add(
jobName,
data,
optionsObject
);
return jobAdded;
}

View File

@@ -27,8 +27,6 @@ export class Service extends DatabaseService<Model> {
): Promise<OnUpdate<Model>> {
/// save trigger and trigger args.
if (
onUpdate.updateBy.data &&
(onUpdate.updateBy.data as any).graph &&
@@ -36,7 +34,6 @@ export class Service extends DatabaseService<Model> {
'nodes'
] as Array<JSONObject>)
) {
let trigger: NodeDataProp | null = null;
// check if it has a trigger node.
@@ -66,9 +63,12 @@ export class Service extends DatabaseService<Model> {
});
}
await API.post<EmptyResponseData>(
new URL(Protocol.HTTP, WorkflowHostname, new Route('/workflow/update/' + onUpdate.updateBy.query._id!)),
new URL(
Protocol.HTTP,
WorkflowHostname,
new Route('/workflow/update/' + onUpdate.updateBy.query._id!)
),
{},
{
...ClusterKeyAuthorization.getClusterKeyHeaders(),

View File

@@ -7,8 +7,6 @@ import { JSONObject } from 'Common/Types/JSON';
import ObjectID from 'Common/Types/ObjectID';
import ComponentMetadata, { Port } from 'Common/Types/Workflow/Component';
export interface RunOptions {
log: Function;
workflowLogId: ObjectID;
@@ -24,7 +22,7 @@ export interface RunReturnType {
export default class ComponentCode {
private metadata: ComponentMetadata | null = null;
public constructor() {}
public setMetadata(metadata: ComponentMetadata): void {

View File

@@ -26,7 +26,7 @@ const Components: Dictionary<ComponentCode> = {
[ComponentID.Log]: new Log(),
[ComponentID.Schedule]: new Schedule(),
[ComponentID.JavaScriptCode]: new JavaScirptCode(),
[ComponentID.Manual]: new ManualTrigger()
[ComponentID.Manual]: new ManualTrigger(),
};
for (const baseModelService of BaseModelServices) {

View File

@@ -7,10 +7,11 @@ import TriggerCode from '../TriggerCode';
export default class ManualTrigger extends TriggerCode {
public constructor() {
super();
const Component: ComponentMetadata | undefined =
ManualComponents.find((i: ComponentMetadata) => {
const Component: ComponentMetadata | undefined = ManualComponents.find(
(i: ComponentMetadata) => {
return i.id === ComponentID.Manual;
});
}
);
if (!Component) {
throw new BadDataException('Component not found.');

View File

@@ -5,10 +5,7 @@ import ComponentMetadata from 'Common/Types/Workflow/Component';
import DatabaseService from '../../../Services/DatabaseService';
import { ExpressRequest, ExpressResponse } from '../../../Utils/Express';
import Response from '../../../Utils/Response';
import TriggerCode, {
ExecuteWorkflowType,
InitProps,
} from '../TriggerCode';
import TriggerCode, { ExecuteWorkflowType, InitProps } from '../TriggerCode';
import BaseModelComponents from 'Common/Types/Workflow/Components/BaseModel';
import Text from 'Common/Types/Text';
import WorkflowService from '../../../Services/WorkflowService';

View File

@@ -36,7 +36,7 @@ export default class WebhookTrigger extends TriggerCode {
select: {
_id: true,
triggerArguments: true,
isEnabled: true
isEnabled: true,
},
props: {
isRoot: true,
@@ -54,7 +54,8 @@ export default class WebhookTrigger extends TriggerCode {
if (
workflow.triggerArguments &&
workflow.triggerArguments['schedule'] && workflow.isEnabled
workflow.triggerArguments['schedule'] &&
workflow.isEnabled
) {
await props.scheduleWorkflow(
executeWorkflow,
@@ -62,16 +63,13 @@ export default class WebhookTrigger extends TriggerCode {
);
}
if(!workflow.isEnabled){
props.removeWorkflow(workflow.id!);
if (!workflow.isEnabled) {
await props.removeWorkflow(workflow.id!);
}
}
}
public override async update(props: UpdateProps): Promise<void> {
console.log("Schedule update");
const workflow: Workflow | null = await WorkflowService.findOneBy({
query: {
triggerId: ComponentID.Schedule,
@@ -81,24 +79,21 @@ export default class WebhookTrigger extends TriggerCode {
select: {
_id: true,
triggerArguments: true,
isEnabled: true
isEnabled: true,
},
props: {
isRoot: true,
}
},
});
if(!workflow){
if (!workflow) {
return;
}
if(!this.scheduleWorkflow){
if (!this.scheduleWorkflow) {
return;
}
console.log("Workflow enabled");
console.log(workflow.isEnabled);
const executeWorkflow: ExecuteWorkflowType = {
workflowId: new ObjectID(workflow._id!),
returnValues: {},
@@ -106,7 +101,8 @@ export default class WebhookTrigger extends TriggerCode {
if (
workflow.triggerArguments &&
workflow.triggerArguments['schedule'] && workflow.isEnabled
workflow.triggerArguments['schedule'] &&
workflow.isEnabled
) {
await this.scheduleWorkflow(
executeWorkflow,
@@ -114,17 +110,12 @@ export default class WebhookTrigger extends TriggerCode {
);
}
console.log("Removing workflow");
console.log(workflow.isEnabled);
if(!this.removeWorkflow){
if (!this.removeWorkflow) {
return;
}
if(!workflow.isEnabled){
console.log("Here 2");
this.removeWorkflow(workflow.id!);
if (!workflow.isEnabled) {
await this.removeWorkflow(workflow.id!);
}
}
}

View File

@@ -5,10 +5,7 @@ import ComponentID from 'Common/Types/Workflow/ComponentID';
import WebhookComponents from 'Common/Types/Workflow/Components/Webhook';
import { ExpressRequest, ExpressResponse } from '../../../Utils/Express';
import Response from '../../../Utils/Response';
import TriggerCode, {
ExecuteWorkflowType,
InitProps,
} from '../TriggerCode';
import TriggerCode, { ExecuteWorkflowType, InitProps } from '../TriggerCode';
export default class WebhookTrigger extends TriggerCode {
public constructor() {

View File

@@ -18,9 +18,7 @@ export interface InitProps {
executeWorkflow: ExecuteWorkflowType,
scheduleAt: string
) => Promise<void>;
removeWorkflow: ((
workflowId: ObjectID
) => Promise<void>);
removeWorkflow: (workflowId: ObjectID) => Promise<void>;
}
export interface UpdateProps {
@@ -28,26 +26,25 @@ export interface UpdateProps {
}
export default class TrigegrCode extends ComponentCode {
public executeWorkflow:
| ((executeWorkflow: ExecuteWorkflowType) => Promise<void>)
| null = null;
public scheduleWorkflow:
| ((
executeWorkflow: ExecuteWorkflowType,
scheduleAt: string
) => Promise<void>)
| null = null;
public executeWorkflow: ((executeWorkflow: ExecuteWorkflowType) => Promise<void>) | null = null;
public scheduleWorkflow: ((
executeWorkflow: ExecuteWorkflowType,
scheduleAt: string
) => Promise<void>) | null = null;
public removeWorkflow: ((
workflowId: ObjectID
) => Promise<void>) | null = null;
public removeWorkflow: ((workflowId: ObjectID) => Promise<void>) | null =
null;
public constructor() {
super();
}
public async setupComponent(props: InitProps): Promise<void> {
this.executeWorkflow = props.executeWorkflow;
this.scheduleWorkflow = props.scheduleWorkflow;
this.removeWorkflow = props.removeWorkflow;

View File

@@ -1,9 +1,9 @@
import { JSONObject } from "Common/Types/JSON";
import ObjectID from "Common/Types/ObjectID";
import { JSONObject } from 'Common/Types/JSON';
import ObjectID from 'Common/Types/ObjectID';
export interface RunProps {
arguments: JSONObject;
workflowId: ObjectID;
workflowLogId: ObjectID | null;
timeout: number;
}
}

View File

@@ -20,7 +20,6 @@ import CustomFieldType from 'Common/Types/CustomField/CustomFieldType';
import TableBillingAccessControl from 'Common/Types/Database/AccessControl/TableBillingAccessControl';
import { PlanSelect } from 'Common/Types/Billing/SubscriptionPlan';
@TableBillingAccessControl({
create: PlanSelect.Growth,
read: PlanSelect.Growth,

View File

@@ -20,7 +20,6 @@ import CustomFieldType from 'Common/Types/CustomField/CustomFieldType';
import TableBillingAccessControl from 'Common/Types/Database/AccessControl/TableBillingAccessControl';
import { PlanSelect } from 'Common/Types/Billing/SubscriptionPlan';
@TableBillingAccessControl({
create: PlanSelect.Growth,
read: PlanSelect.Growth,

View File

@@ -20,7 +20,6 @@ import CustomFieldType from 'Common/Types/CustomField/CustomFieldType';
import TableBillingAccessControl from 'Common/Types/Database/AccessControl/TableBillingAccessControl';
import { PlanSelect } from 'Common/Types/Billing/SubscriptionPlan';
@TableBillingAccessControl({
create: PlanSelect.Growth,
read: PlanSelect.Growth,

View File

@@ -20,7 +20,6 @@ import CustomFieldType from 'Common/Types/CustomField/CustomFieldType';
import TableBillingAccessControl from 'Common/Types/Database/AccessControl/TableBillingAccessControl';
import { PlanSelect } from 'Common/Types/Billing/SubscriptionPlan';
@TableBillingAccessControl({
create: PlanSelect.Growth,
read: PlanSelect.Growth,

View File

@@ -31,7 +31,6 @@ import AccessControlColumn from 'Common/Types/Database/AccessControlColumn';
import TableBillingAccessControl from 'Common/Types/Database/AccessControl/TableBillingAccessControl';
import { PlanSelect } from 'Common/Types/Billing/SubscriptionPlan';
@TableBillingAccessControl({
create: PlanSelect.Growth,
read: PlanSelect.Growth,
@@ -441,7 +440,6 @@ export default class Workflow extends BaseModel {
})
public triggerArguments?: JSONObject = undefined;
// This is a BullMQ job key that is used to schedule job for this workflow. This is used internally to remove existing job.
@ColumnAccessControl({
create: [],

View File

@@ -20,7 +20,6 @@ import Workflow from './Workflow';
import TableBillingAccessControl from 'Common/Types/Database/AccessControl/TableBillingAccessControl';
import { PlanSelect } from 'Common/Types/Billing/SubscriptionPlan';
@TableBillingAccessControl({
create: PlanSelect.Growth,
read: PlanSelect.Growth,

View File

@@ -25,7 +25,7 @@ export default class ComponentCodeAPI {
router: this.router,
scheduleWorkflow: this.scheduleWorkflow,
executeWorkflow: this.executeWorkflow,
removeWorkflow: this.removeWorkflow
removeWorkflow: this.removeWorkflow,
})
.catch((err: Error) => {
logger.error(err);
@@ -38,7 +38,6 @@ export default class ComponentCodeAPI {
executeWorkflow: ExecuteWorkflowType,
scheduleAt: string
): Promise<void> {
/// add to queue.
await QueueWorkflow.addWorkflowToQueue(executeWorkflow, scheduleAt);
}
@@ -50,12 +49,7 @@ export default class ComponentCodeAPI {
await QueueWorkflow.addWorkflowToQueue(executeWorkflow);
}
public async removeWorkflow(
workflowId: ObjectID
): Promise<void> {
console.log("REMOVE WORKFLOW")
public async removeWorkflow(workflowId: ObjectID): Promise<void> {
// add to queue.
await QueueWorkflow.removeWorkflow(workflowId);
}

View File

@@ -19,9 +19,17 @@ export default class WorkflowAPI {
public constructor() {
this.router = Express.getRouter();
this.router.get(`/update/:workflowId`, ClusterKeyAuthorization.isAuthorizedServiceMiddleware, this.updateWorkflow);
this.router.get(
`/update/:workflowId`,
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
this.updateWorkflow
);
this.router.post(`/update/:workflowId`, ClusterKeyAuthorization.isAuthorizedServiceMiddleware, this.updateWorkflow);
this.router.post(
`/update/:workflowId`,
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
this.updateWorkflow
);
}
public async updateWorkflow(
@@ -46,7 +54,7 @@ export default class WorkflowAPI {
},
props: {
isRoot: true,
}
},
});
if (!workflow) {
@@ -55,13 +63,14 @@ export default class WorkflowAPI {
});
}
if(!workflow.triggerId){
if (!workflow.triggerId) {
return Response.sendJsonObjectResponse(req, res, {
status: 'Trigger not found in workflow',
});
}
const componentCode: ComponentCode | undefined = Components[workflow.triggerId];
const componentCode: ComponentCode | undefined =
Components[workflow.triggerId];
if (!componentCode) {
return Response.sendJsonObjectResponse(req, res, {
@@ -71,7 +80,7 @@ export default class WorkflowAPI {
if (componentCode instanceof TriggerCode) {
await componentCode.update({
workflowId: workflow.id!
workflowId: workflow.id!,
});
}

View File

@@ -39,13 +39,11 @@ app.get(
QueueWorker.getWorker(
QueueName.Workflow,
async (job: QueueJob) => {
console.log("Job Pending: ");
console.log(job.data);
await new RunWorkflow().runWorkflow({
workflowId: new ObjectID(job.data['workflowId'] as string),
workflowLogId: job.data['workflowLogId'] ? new ObjectID(job.data['workflowLogId'] as string): null,
workflowLogId: job.data['workflowLogId']
? new ObjectID(job.data['workflowLogId'] as string)
: null,
arguments: job.data.data as JSONObject,
timeout: 5000,
});

View File

@@ -13,16 +13,16 @@ import QueryHelper from 'CommonServer/Types/Database/QueryHelper';
import WorkflowPlan from 'Common/Types/Workflow/WorkflowPlan';
import PositiveNumber from 'Common/Types/PositiveNumber';
import { PlanSelect } from 'Common/Types/Billing/SubscriptionPlan';
import { Job } from 'bullmq';
export default class QueueWorkflow {
public static async removeWorkflow(workflowId: ObjectID) {
public static async removeWorkflow(workflowId: ObjectID): Promise<void> {
// get workflow to see if its enabled.
const workflow: Workflow | null = await WorkflowService.findOneById({
id: workflowId,
select: {
projectId: true,
repeatableJobKey: true
repeatableJobKey: true,
},
props: {
isRoot: true,
@@ -39,26 +39,21 @@ export default class QueueWorkflow {
);
}
await Queue.removeJob(
QueueName.Workflow,
workflow.repeatableJobKey!
);
await Queue.removeJob(QueueName.Workflow, workflow.repeatableJobKey!);
// update workflow.
// update workflow.
await WorkflowService.updateOneById({
id: workflow.id!,
data: {
repeatableJobKey: null!
repeatableJobKey: null!,
},
props: {
isRoot: true,
ignoreHooks: true
}
ignoreHooks: true,
},
});
}
public static async addWorkflowToQueue(
executeWorkflow: ExecuteWorkflowType,
scheduleAt?: string
@@ -71,7 +66,7 @@ export default class QueueWorkflow {
select: {
isEnabled: true,
projectId: true,
repeatableJobKey: true
repeatableJobKey: true,
},
props: {
isRoot: true,
@@ -143,7 +138,8 @@ export default class QueueWorkflow {
runLog.workflowStatus = WorkflowStatus.WorkflowCountExceeded;
runLog.logs =
OneUptimeDate.getCurrentDateAsFormattedString() +
`: Workflow cannot run because it already ran ${workflowCount.toNumber()} in the last 30 days. Your current plan limit is ${WorkflowPlan[projectPlan.plan]
`: Workflow cannot run because it already ran ${workflowCount.toNumber()} in the last 30 days. Your current plan limit is ${
WorkflowPlan[projectPlan.plan]
}`;
await WorkflowLogService.create({
@@ -177,33 +173,38 @@ export default class QueueWorkflow {
});
}
const job = await Queue.addJob(
const job: Job = await Queue.addJob(
QueueName.Workflow,
workflowLog ? workflowLog._id?.toString()! : workflow._id?.toString()!,
workflowLog ? workflowLog._id?.toString()! : workflow._id?.toString()!,
workflowLog
? workflowLog._id?.toString()!
: workflow._id?.toString()!,
workflowLog
? workflowLog._id?.toString()!
: workflow._id?.toString()!,
{
data: executeWorkflow.returnValues,
workflowLogId: workflowLog?._id || null,
workflowId: workflow._id,
},
{ scheduleAt: scheduleAt, repeatableKey: workflow.repeatableJobKey || undefined }
{
scheduleAt: scheduleAt,
repeatableKey: workflow.repeatableJobKey || undefined,
}
);
// update workflow with repeatable key.
if (job.repeatJobKey) {
// update workflow.
// update workflow.
await WorkflowService.updateOneById({
id: workflow.id!,
data: {
repeatableJobKey: job.repeatJobKey
repeatableJobKey: job.repeatJobKey,
},
props: {
isRoot: true,
ignoreHooks: true
}
ignoreHooks: true,
},
});
}
}

View File

@@ -13,7 +13,7 @@ import WorkflowService from 'CommonServer/Services/WorkflowService';
import ComponentCode, {
RunReturnType,
} from 'CommonServer/Types/Workflow/ComponentCode';
import { RunProps } from "CommonServer/Types/Workflow/Workflow";
import { RunProps } from 'CommonServer/Types/Workflow/Workflow';
import WorkflowVariable from 'Model/Models/WorkflowVariable';
import WorkflowVariableService from 'CommonServer/Services/WorkflowVariableService';
import { LIMIT_PER_PROJECT } from 'Common/Types/Database/LimitMax';
@@ -98,9 +98,8 @@ export default class RunWorkflow {
this.projectId = workflow.projectId || null;
if (!runProps.workflowLogId) {
// create a new workflow log here.
// create a new workflow log here.
// if the workflow is to be run immeidately.
const runLog: WorkflowLog = new WorkflowLog();
runLog.workflowId = runProps.workflowId;
@@ -110,15 +109,16 @@ export default class RunWorkflow {
OneUptimeDate.getCurrentDateAsFormattedString() +
': Workflow Scheduled.';
runProps.workflowLogId = (await WorkflowLogService.create({
data: runLog,
props: {
isRoot: true,
},
})).id!;
runProps.workflowLogId = (
await WorkflowLogService.create({
data: runLog,
props: {
isRoot: true,
},
})
).id!;
}
// update workflow log.
await WorkflowLogService.updateOneById({
id: runProps.workflowLogId,
@@ -158,8 +158,8 @@ export default class RunWorkflow {
if (didWorkflowTimeOut) {
throw new TimeoutException(
'Workflow execution time was more than ' +
runProps.timeout +
'ms and workflow timed-out.'
runProps.timeout +
'ms and workflow timed-out.'
);
}
@@ -171,8 +171,8 @@ export default class RunWorkflow {
if (componentsExecuted.includes(executeComponentId)) {
throw new BadDataException(
'Cyclic Workflow Detected. Cannot execute ' +
executeComponentId +
' when it has already been executed.'
executeComponentId +
' when it has already been executed.'
);
}
@@ -186,8 +186,8 @@ export default class RunWorkflow {
if (!stackItem) {
throw new BadDataException(
'Component with ID ' +
executeComponentId +
' not found.'
executeComponentId +
' not found.'
);
}
@@ -253,7 +253,7 @@ export default class RunWorkflow {
this.log(result.returnValues);
this.log(
'Executing Port: ' + result.executePort?.title ||
'<None>'
'<None>'
);
storageMap.local.components[stackItem.node.id] = {
@@ -515,8 +515,8 @@ export default class RunWorkflow {
} else {
this.logs.push(
OneUptimeDate.getCurrentDateAsFormattedString() +
': ' +
JSON.stringify(data)
': ' +
JSON.stringify(data)
);
}
}
@@ -590,7 +590,7 @@ export default class RunWorkflow {
const trigger: any | undefined = nodes.find((n: any) => {
return (
(n.data as NodeDataProp).componentType ===
ComponentType.Trigger &&
ComponentType.Trigger &&
(n.data as NodeDataProp).nodeType === NodeType.Node
);
});