Compare commits

...

1 Commits

Author SHA1 Message Date
Simon Larsen
a18207b04f feat: add isManualExecution flag to workflow execution and related components 2025-02-26 14:08:11 +00:00
15 changed files with 165 additions and 111 deletions

View File

@@ -29,7 +29,10 @@ import WorkspaceUserAuthTokenService from "./WorkspaceUserAuthTokenService";
import WorkspaceMessagePayload, {
WorkspaceMessageBlock,
} from "../../Types/Workspace/WorkspaceMessagePayload";
import WorkspaceProjectAuthToken, { MiscData, SlackMiscData } from "../../Models/DatabaseModels/WorkspaceProjectAuthToken";
import WorkspaceProjectAuthToken, {
MiscData,
SlackMiscData,
} from "../../Models/DatabaseModels/WorkspaceProjectAuthToken";
import WorkspaceProjectAuthTokenService from "./WorkspaceProjectAuthTokenService";
import logger from "../Utils/Logger";
@@ -52,14 +55,16 @@ export class Service extends DatabaseService<Model> {
const miscData: MiscData | undefined = data.projectAuthToken.miscData;
if (!miscData) {
throw new BadDataException("Misc data not found in project auth token");
throw new BadDataException("Misc data not found in project auth token");
}
if(data.workspaceType === WorkspaceType.Slack) {
const userId: string = (miscData as SlackMiscData).botUserId;
if (data.workspaceType === WorkspaceType.Slack) {
const userId: string = (miscData as SlackMiscData).botUserId;
if (!userId) {
throw new BadDataException("Bot user ID not found in project auth token");
throw new BadDataException(
"Bot user ID not found in project auth token",
);
}
return userId;
@@ -68,7 +73,6 @@ export class Service extends DatabaseService<Model> {
throw new BadDataException("Workspace type not supported");
}
public async createInviteAndPostToChannelsBasedOnRules(data: {
projectId: ObjectID;
notificationRuleEventType: NotificationRuleEventType;

View File

@@ -222,6 +222,7 @@ export default class OnTriggerBaseModel<
returnValues: {
data: requestData,
},
isManualExecution: false,
};
promises.push(props.executeWorkflow(executeWorkflow));

View File

@@ -53,6 +53,7 @@ export default class WebhookTrigger extends TriggerCode {
const executeWorkflow: ExecuteWorkflowType = {
workflowId: new ObjectID(workflow._id!),
returnValues: {},
isManualExecution: false,
};
if (
@@ -122,6 +123,7 @@ export default class WebhookTrigger extends TriggerCode {
const executeWorkflow: ExecuteWorkflowType = {
workflowId: new ObjectID(workflow._id!),
returnValues: {},
isManualExecution: false,
};
if (

View File

@@ -89,6 +89,7 @@ export default class WebhookTrigger extends TriggerCode {
"request-params": req.query,
"request-body": req.body,
},
isManualExecution: false,
};
await props.executeWorkflow(executeWorkflow);

View File

@@ -10,6 +10,8 @@ import { Port } from "Common/Types/Workflow/Component";
export interface ExecuteWorkflowType {
workflowId: ObjectID;
returnValues: JSONObject;
// is this workflow triggered manually or not
isManualExecution: boolean;
}
export interface InitProps {

View File

@@ -6,4 +6,5 @@ export interface RunProps {
workflowId: ObjectID;
workflowLogId: ObjectID | null;
timeout: number;
isManualExecution: boolean;
}

View File

@@ -15,7 +15,6 @@ import WorkspaceBase, { WorkspaceChannel } from "../WorkspaceBase";
import WorkspaceType from "../../../../Types/Workspace/WorkspaceType";
export default class SlackUtil extends WorkspaceBase {
public static override async joinChannel(data: {
authToken: string;
channelId: string;
@@ -31,9 +30,9 @@ export default class SlackUtil extends WorkspaceBase {
},
{
Authorization: `Bearer ${data.authToken}`,
['Content-Type']: "application/x-www-form-urlencoded",
["Content-Type"]: "application/x-www-form-urlencoded",
},
);
);
logger.debug("Response from Slack API for joining channel:");
logger.debug(response);
@@ -52,7 +51,6 @@ export default class SlackUtil extends WorkspaceBase {
logger.debug("Channel joined successfully with data:");
logger.debug(data);
}
public static override async inviteUserToChannelByChannelId(data: {
@@ -72,7 +70,7 @@ export default class SlackUtil extends WorkspaceBase {
},
{
Authorization: `Bearer ${data.authToken}`,
['Content-Type']: "application/x-www-form-urlencoded",
["Content-Type"]: "application/x-www-form-urlencoded",
},
);
@@ -99,8 +97,7 @@ export default class SlackUtil extends WorkspaceBase {
channelName: string;
workspaceUserId: string;
}): Promise<void> {
if(data.channelName && data.channelName.startsWith("#")) {
if (data.channelName && data.channelName.startsWith("#")) {
// trim # from channel name
data.channelName = data.channelName.substring(1);
}
@@ -139,9 +136,8 @@ export default class SlackUtil extends WorkspaceBase {
logger.debug(existingWorkspaceChannels);
for (let channelName of data.channelNames) {
// if channel name starts with #, remove it
if(channelName && channelName.startsWith("#")) {
if (channelName && channelName.startsWith("#")) {
channelName = channelName.substring(1);
}
@@ -168,7 +164,6 @@ export default class SlackUtil extends WorkspaceBase {
return workspaceChannels;
}
public static override async getWorkspaceChannelFromChannelName(data: {
authToken: string;
channelName: string;
@@ -176,9 +171,10 @@ export default class SlackUtil extends WorkspaceBase {
logger.debug("Getting workspace channel ID from channel name with data:");
logger.debug(data);
const channels: Dictionary<WorkspaceChannel> = await this.getAllWorkspaceChannels({
authToken: data.authToken,
});
const channels: Dictionary<WorkspaceChannel> =
await this.getAllWorkspaceChannels({
authToken: data.authToken,
});
logger.debug("All workspace channels:");
logger.debug(channels);
@@ -191,7 +187,7 @@ export default class SlackUtil extends WorkspaceBase {
logger.debug("Workspace channel ID obtained:");
logger.debug(channels[data.channelName]!.id);
return channels[data.channelName]!;
return channels[data.channelName]!;
}
public static override async getWorkspaceChannelFromChannelId(data: {
@@ -209,7 +205,7 @@ export default class SlackUtil extends WorkspaceBase {
},
{
Authorization: `Bearer ${data.authToken}`,
['Content-Type']: "application/x-www-form-urlencoded",
["Content-Type"]: "application/x-www-form-urlencoded",
},
);
@@ -262,7 +258,7 @@ export default class SlackUtil extends WorkspaceBase {
{},
{
Authorization: `Bearer ${data.authToken}`,
['Content-Type']: "application/x-www-form-urlencoded",
["Content-Type"]: "application/x-www-form-urlencoded",
},
);
@@ -329,8 +325,7 @@ export default class SlackUtil extends WorkspaceBase {
const channelIdsToPostTo: Array<string> = [];
for (let channelName of data.workspaceMessagePayload.channelNames) {
if(channelName && channelName.startsWith("#")) {
if (channelName && channelName.startsWith("#")) {
// trim # from channel name
channelName = channelName.substring(1);
}
@@ -353,19 +348,18 @@ export default class SlackUtil extends WorkspaceBase {
for (const channelId of channelIdsToPostTo) {
try {
// check if the user is in the channel.
// check if the user is in the channel.
const isUserInChannel = await this.isUserInChannel({
authToken: data.authToken,
channelId: channelId,
userId: data.userId,
});
if(!isUserInChannel) {
// add user to the channel
if (!isUserInChannel) {
// add user to the channel
await this.joinChannel({
authToken: data.authToken,
channelId: channelId
channelId: channelId,
});
}
@@ -376,7 +370,6 @@ export default class SlackUtil extends WorkspaceBase {
});
logger.debug(`Message sent to channel ID ${channelId} successfully.`);
} catch (e) {
logger.error(`Error sending message to channel ID ${channelId}:`);
logger.error(e);
@@ -401,7 +394,7 @@ export default class SlackUtil extends WorkspaceBase {
},
{
Authorization: `Bearer ${data.authToken}`,
['Content-Type']: "application/json",
["Content-Type"]: "application/json",
},
);
@@ -438,7 +431,7 @@ export default class SlackUtil extends WorkspaceBase {
},
{
Authorization: `Bearer ${data.authToken}`,
['Content-Type']: "application/x-www-form-urlencoded",
["Content-Type"]: "application/x-www-form-urlencoded",
},
);
@@ -520,15 +513,12 @@ export default class SlackUtil extends WorkspaceBase {
return markdownBlock;
}
public static override async isUserInChannel(data: {
authToken: string;
channelId: string;
userId: string;
}): Promise<boolean> {
const members: Array<string> = [];
const members: Array<string> = [];
logger.debug("Checking if user is in channel with data:");
logger.debug(data);
@@ -536,63 +526,61 @@ export default class SlackUtil extends WorkspaceBase {
let cursor: string | undefined = undefined;
do {
// check if the user is in the channel, return true if they are, false if they are not
// check if the user is in the channel, return true if they are, false if they are not
const requestBody: JSONObject = {
channel: data.channelId,
limit: 1000,
};
const requestBody: JSONObject = {
channel: data.channelId,
limit: 1000,
};
if(cursor) {
requestBody["cursor"] = cursor;
}
if (cursor) {
requestBody["cursor"] = cursor;
}
const response: HTTPErrorResponse | HTTPResponse<JSONObject> = await API.post<JSONObject>(
URL.fromString("https://slack.com/api/conversations.members"),
requestBody,
{
Authorization: `Bearer ${data.authToken}`,
['Content-Type']: "application/x-www-form-urlencoded",
},
);
const response: HTTPErrorResponse | HTTPResponse<JSONObject> =
await API.post<JSONObject>(
URL.fromString("https://slack.com/api/conversations.members"),
requestBody,
{
Authorization: `Bearer ${data.authToken}`,
["Content-Type"]: "application/x-www-form-urlencoded",
},
);
logger.debug("Response from Slack API for getting channel members:");
logger.debug(response);
logger.debug("Response from Slack API for getting channel members:");
logger.debug(response);
if (response instanceof HTTPErrorResponse) {
logger.error("Error response from Slack API:");
logger.error(response);
throw response;
}
if (response instanceof HTTPErrorResponse) {
logger.error("Error response from Slack API:");
logger.error(response);
throw response;
}
// check for ok response
// check for ok response
if ((response.jsonData as JSONObject)?.["ok"] !== true) {
logger.error("Invalid response from Slack API:");
logger.error(response.jsonData);
throw new BadRequestException("Invalid response");
}
if ((response.jsonData as JSONObject)?.["ok"] !== true) {
logger.error("Invalid response from Slack API:");
logger.error(response.jsonData);
throw new BadRequestException("Invalid response");
}
// check if the user is in the channel
const membersOnThisPage: Array<string> = (
response.jsonData as JSONObject
)["members"] as Array<string>;
// check if the user is in the channel
const membersOnThisPage: Array<string> = (response.jsonData as JSONObject)["members"] as Array<string>;
members.push(...membersOnThisPage);
members.push(...membersOnThisPage);
cursor = (
(response.jsonData as JSONObject)["response_metadata"] as JSONObject
)?.["next_cursor"] as string;
} while (cursor);
cursor = ((response.jsonData as JSONObject)["response_metadata"] as JSONObject)?.["next_cursor"] as string;
} while(cursor);
if(members.includes(data.userId)) {
if (members.includes(data.userId)) {
return true;
}
return false;
}
public static override getButtonBlock(data: {
@@ -623,17 +611,18 @@ export default class SlackUtil extends WorkspaceBase {
logger.debug("Sending message to channel via incoming webhook with data:");
logger.debug(data);
const apiResult: HTTPResponse<JSONObject> | HTTPErrorResponse | null = await API.post(data.url, {
blocks: [
{
type: "section",
text: {
type: "mrkdwn",
text: `${data.text}`,
const apiResult: HTTPResponse<JSONObject> | HTTPErrorResponse | null =
await API.post(data.url, {
blocks: [
{
type: "section",
text: {
type: "mrkdwn",
text: `${data.text}`,
},
},
},
],
});
],
});
logger.debug("Response from Slack API for sending message via webhook:");
logger.debug(apiResult);

View File

@@ -21,7 +21,6 @@ export interface WorkspaceChannel {
}
export default class WorkspaceBase {
public static async joinChannel(_data: {
authToken: string;
channelId: string;
@@ -78,8 +77,7 @@ export default class WorkspaceBase {
authToken: string;
channelId: string;
workspaceUserId: string;
}): Promise<void> {
}
}): Promise<void> {}
public static async createChannelsIfDoesNotExist(_data: {
authToken: string;

View File

@@ -90,6 +90,8 @@ export default interface ComponentMetadata {
outPorts: Array<Port>;
tableName?: string | undefined;
documentationLink?: Route;
// this is used in trigger component to show the manual execution button
runWorkflowManuallyArguments?: Array<Argument> | undefined;
}
export interface ComponentCategory {

View File

@@ -16,7 +16,37 @@ const components: Array<ComponentMetadata> = [
iconProp: IconProp.AltGlobe,
componentType: ComponentType.Trigger,
documentationLink: Route.fromString("/workflow/docs/Webhook.md"),
arguments: [],
arguments: [
],
runWorkflowManuallyArguments: [
{
id: "request-headers",
name: "Request Headers",
description: "Request Headers for this request",
type: ComponentInputType.StringDictionary,
required: false,
placeholder: '{"header1": "value1", "header2": "value2", ....}',
},
{
id: "request-params",
name: "Request Query Params",
description: "Request Query Params for this request",
type: ComponentInputType.StringDictionary,
required: false,
placeholder: '{"query1": "value1", "query2": "value2", ....}',
},
{
id: "request-body",
name: "Request Body",
description: "Request Body",
type: ComponentInputType.JSON,
required: false,
placeholder: '{"key1": "value1", "key2": "value2", ....}',
},
],
returnValues: [
{
id: "request-headers",

View File

@@ -312,16 +312,17 @@ const Delete: FunctionComponent<PageComponentProps> = (): ReactElement => {
}}
onRun={async (component: NodeDataProp) => {
try {
const result: HTTPErrorResponse | HTTPResponse<JSONObject> = await API.post(
URL.fromString(WORKFLOW_URL.toString()).addRoute(
"/manual/run/" + modelId.toString(),
),
{
data: component.returnValues,
},
);
const result: HTTPErrorResponse | HTTPResponse<JSONObject> =
await API.post(
URL.fromString(WORKFLOW_URL.toString()).addRoute(
"/manual/run/" + modelId.toString(),
),
{
data: component.returnValues,
},
);
if(result instanceof HTTPErrorResponse) {
if (result instanceof HTTPErrorResponse) {
throw result;
}

View File

@@ -23,9 +23,8 @@ export default class ManualAPI {
public async manuallyRunWorkflow(
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction
next: NextFunction,
): Promise<void> {
try {
// add this workflow to the run queue and return the 200 response.
@@ -40,12 +39,12 @@ export default class ManualAPI {
await QueueWorkflow.addWorkflowToQueue({
workflowId: new ObjectID(req.params["workflowId"] as string),
returnValues: req.body.data || {},
isManualExecution: true,
});
return Response.sendJsonObjectResponse(req, res, {
status: "Scheduled",
});
} catch (err) {
next(err);
}

View File

@@ -52,6 +52,7 @@ const WorkflowFeatureSet: FeatureSet = {
: null,
arguments: job.data.data as JSONObject,
timeout: WorkflowTimeoutInMs || 5000,
isManualExecution: job.data.isManualExecution || false,
});
},
{ concurrency: 100 },

View File

@@ -187,6 +187,7 @@ export default class QueueWorkflow {
data: executeWorkflow.returnValues,
workflowLogId: workflowLog?._id || null,
workflowId: workflow._id,
isManualExecution: executeWorkflow.isManualExecution, // this is to check if the workflow is triggered manually or not.
},
{
scheduleAt: scheduleAt,

View File

@@ -135,7 +135,9 @@ export default class RunWorkflow {
// form a run stack.
const runStack: RunStack = await this.makeRunStack(workflow.graph);
const runStack: RunStack = await this.makeRunStack({
graph: workflow.graph,
});
const getVariableResult: {
storageMap: StorageMap;
@@ -212,16 +214,34 @@ export default class RunWorkflow {
this.log(args);
this.log("Component Logs: " + executeComponentId);
const result: RunReturnType = await this.runComponent(
args,
stackItem.node,
setDidErrorOut,
);
let result: RunReturnType | null = null;
if (
runProps.isManualExecution &&
stackItem.node.componentType === ComponentType.Trigger
) {
// skip the trigger component if this is a manual execution.
result = {
returnValues: runProps.arguments,
executePort: stackItem.node.metadata.outPorts[0] || undefined,
};
} else {
result = await this.runComponent(
args,
stackItem.node,
setDidErrorOut,
);
}
if (didWorkflowErrorOut) {
throw new BadDataException("Workflow stopped because of an error");
}
if (!result) {
this.log("No result returned from component: " + executeComponentId);
break;
}
this.log("Completed Execution Component: " + executeComponentId);
this.log("Data Returned");
this.log(result.returnValues);
@@ -503,7 +523,9 @@ export default class RunWorkflow {
}
}
public async makeRunStack(graph: JSONObject): Promise<RunStack> {
public async makeRunStack(data: { graph: JSONObject }): Promise<RunStack> {
const graph: JSONObject = data.graph;
const nodes: Array<any> = graph["nodes"] as Array<any>;
const edges: Array<any> = graph["edges"] as Array<any>;