fix: scheduler edge cases and minor bugs.

This commit is contained in:
Elizabeth
2025-10-21 05:53:21 -05:00
parent 5857ef6d05
commit 909f3120bd
10 changed files with 153 additions and 30 deletions

View File

@@ -20,6 +20,8 @@ class ProcessRunnableCommand extends Command
*/
public function handle(): int
{
$this->cleanupStuckSchedules();
$schedules = Schedule::query()
->with('tasks')
->whereRelation('server', fn (Builder $builder) => $builder->whereNull('status'))
@@ -73,4 +75,32 @@ class ProcessRunnableCommand extends Command
$this->error("An error was encountered while processing Schedule #$schedule->id: " . $exception->getMessage());
}
}
protected function cleanupStuckSchedules(): void
{
$timeout = 600;
$stuck = Schedule::query()
->where('is_processing', true)
->where('updated_at', '<', now()->subSeconds($timeout))
->get();
if ($stuck->count() > 0) {
$this->warn("Found {$stuck->count()} stuck schedule(s), resetting...");
foreach ($stuck as $schedule) {
$schedule->update(['is_processing' => false]);
$schedule->tasks()->update([
'is_queued' => false,
'is_processing' => false,
]);
Log::warning('Reset stuck schedule', [
'schedule_id' => $schedule->id,
'schedule_name' => $schedule->name,
'last_updated' => $schedule->updated_at,
]);
}
}
}
}

View File

@@ -143,6 +143,10 @@ class ScheduleController extends ClientApiController
*/
public function execute(TriggerScheduleRequest $request, Server $server, Schedule $schedule): JsonResponse
{
if ($schedule->server_id !== $server->id) {
throw new NotFoundHttpException();
}
$this->service->handle($schedule, true);
Activity::event('server:schedule.execute')->subject($schedule)->property('name', $schedule->name)->log();

View File

@@ -84,7 +84,7 @@ class ScheduleTaskController extends ClientApiController
Activity::event('server:task.create')
->subject($schedule, $task)
->property(['name' => $schedule->name, 'action' => $task->action, 'payload' => $task->payload])
->property(['name' => $schedule->name, 'action' => $task->action])
->log();
return $this->fractal->item($task)
@@ -139,7 +139,7 @@ class ScheduleTaskController extends ClientApiController
Activity::event('server:task.update')
->subject($schedule, $task)
->property(['name' => $schedule->name, 'action' => $task->action, 'payload' => $task->payload])
->property(['name' => $schedule->name, 'action' => $task->action])
->log();
return $this->fractal->item($task->refresh())
@@ -163,6 +163,10 @@ class ScheduleTaskController extends ClientApiController
throw new HttpForbiddenException('You do not have permission to perform this action.');
}
if ($task->is_queued || $task->is_processing) {
throw new HttpForbiddenException('Cannot delete a task that is currently queued or processing.');
}
$schedule->tasks()
->where('sequence_id', '>', $task->sequence_id)
->decrement('sequence_id');

View File

@@ -22,7 +22,9 @@ class StoreScheduleRequest extends ViewScheduleRequest
'minute' => $rules['cron_minute'],
'hour' => $rules['cron_hour'],
'day_of_month' => $rules['cron_day_of_month'],
'month' => $rules['cron_month'],
'day_of_week' => $rules['cron_day_of_week'],
'only_when_online' => array_merge(['filled'], $rules['only_when_online']),
];
}
}

View File

@@ -20,7 +20,16 @@ class StoreTaskRequest extends ViewScheduleRequest
{
return [
'action' => 'required|in:command,power,backup',
'payload' => 'required_unless:action,backup|string|nullable',
'payload' => [
'required_unless:action,backup',
'string',
'nullable',
function ($attribute, $value, $fail) {
if ($this->input('action') === 'power' && !in_array($value, ['start', 'stop', 'restart', 'kill'])) {
$fail('The power action must be one of: start, stop, restart, kill.');
}
},
],
'time_offset' => 'required|numeric|min:0|max:900',
'sequence_id' => 'sometimes|required|numeric|min:1',
'continue_on_failure' => 'sometimes|required|boolean',

View File

@@ -65,35 +65,37 @@ class RunTaskJob extends Job implements ShouldQueue
$commandRepository->setServer($server)->send($this->task->payload);
break;
case Task::ACTION_BACKUP:
// Mark the task as running before initiating the backup to prevent duplicate runs
$this->task->update(['is_processing' => true]);
$affectedRows = Task::where('id', $this->task->id)
->where('is_processing', false)
->update(['is_processing' => true]);
$ignoredFiles = !empty($this->task->payload) ? explode(PHP_EOL, $this->task->payload) : [];
if ($affectedRows === 0) {
return;
}
$elytraJobService->submitJob(
$server,
'backup_create',
[
'operation' => 'create',
'adapter' => config('backups.default', 'elytra'),
'ignored' => implode("\n", $ignoredFiles),
'name' => 'Scheduled Backup - ' . now()->format('Y-m-d H:i'),
'is_automatic' => true,
],
auth()->user() ?? $server->user
);
try {
$ignoredFiles = !empty($this->task->payload) ? explode(PHP_EOL, $this->task->payload) : [];
$this->task->update(['is_processing' => false]);
$elytraJobService->submitJob(
$server,
'backup_create',
[
'operation' => 'create',
'adapter' => config('backups.default', 'elytra'),
'ignored' => implode("\n", $ignoredFiles),
'name' => 'Scheduled Backup - ' . now()->format('Y-m-d H:i'),
'is_automatic' => true,
],
auth()->user() ?? $server->user
);
} finally {
$this->task->update(['is_processing' => false]);
}
break;
default:
throw new \InvalidArgumentException('Invalid task action provided: ' . $this->task->action);
}
} catch (\Exception $exception) {
// Reset the processing flag if there was an error
if ($this->task->action === Task::ACTION_BACKUP) {
$this->task->update(['is_processing' => false]);
}
// If this isn't a DaemonConnectionException on a task that allows for failures
// throw the exception back up the chain so that the task is stopped.
if (!($this->task->continue_on_failure && $exception instanceof DaemonConnectionException)) {
@@ -110,6 +112,10 @@ class RunTaskJob extends Job implements ShouldQueue
*/
public function failed(?\Exception $exception = null)
{
if ($this->task->action === Task::ACTION_BACKUP) {
$this->task->update(['is_processing' => false]);
}
$this->markTaskNotQueued();
$this->markScheduleComplete();
}

View File

@@ -16,6 +16,7 @@ use Pterodactyl\Contracts\Extensions\HashidsInterface;
* @property string $payload
* @property int $time_offset
* @property bool $is_queued
* @property bool $is_processing
* @property bool $continue_on_failure
* @property \Carbon\Carbon $created_at
* @property \Carbon\Carbon $updated_at
@@ -62,6 +63,7 @@ class Task extends Model
'payload',
'time_offset',
'is_queued',
'is_processing',
'continue_on_failure',
];
@@ -74,6 +76,7 @@ class Task extends Model
'sequence_id' => 'integer',
'time_offset' => 'integer',
'is_queued' => 'boolean',
'is_processing' => 'boolean',
'continue_on_failure' => 'boolean',
];
@@ -83,6 +86,7 @@ class Task extends Model
protected $attributes = [
'time_offset' => 0,
'is_queued' => false,
'is_processing' => false,
'continue_on_failure' => false,
];
@@ -93,6 +97,7 @@ class Task extends Model
'payload' => 'required_unless:action,backup|string',
'time_offset' => 'required|numeric|between:0,900',
'is_queued' => 'boolean',
'is_processing' => 'boolean',
'continue_on_failure' => 'boolean',
];

View File

@@ -34,15 +34,27 @@ class ProcessScheduleService
throw new DisplayException('Cannot process schedule for task execution: no tasks are registered.');
}
$this->connection->transaction(function () use ($schedule, $task) {
$schedule->forceFill([
'is_processing' => true,
'next_run_at' => $schedule->getNextRunDate(),
])->saveOrFail();
$claimed = false;
$this->connection->transaction(function () use ($schedule, $task, &$claimed) {
$affectedRows = Schedule::where('id', $schedule->id)
->where('is_processing', false)
->update([
'is_processing' => true,
'next_run_at' => $schedule->getNextRunDate(),
]);
if ($affectedRows === 0) {
return;
}
$claimed = true;
$task->update(['is_queued' => true]);
});
if (!$claimed) {
return;
}
$job = new RunTaskJob($task, $now);
if ($schedule->only_when_online) {
// Check that the server is currently in a starting or running state before executing
@@ -62,8 +74,9 @@ class ProcessScheduleService
// issue connecting to Wings run the failed sequence for a job. Otherwise we
// can just quietly mark the task as completed without actually running anything.
$job->failed($exception);
} else {
$job->failed();
}
$job->failed();
return;
}

View File

@@ -0,0 +1,22 @@
<?php
use Illuminate\Support\Facades\Schema;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Database\Migrations\Migration;
return new class extends Migration
{
public function up(): void
{
Schema::table('schedules', function (Blueprint $table) {
$table->index(['is_active', 'is_processing', 'next_run_at'], 'schedules_process_index');
});
}
public function down(): void
{
Schema::table('schedules', function (Blueprint $table) {
$table->dropIndex('schedules_process_index');
});
}
};

View File

@@ -0,0 +1,28 @@
<?php
use Illuminate\Support\Facades\Schema;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Database\Migrations\Migration;
return new class extends Migration
{
public function up(): void
{
Schema::table('tasks', function (Blueprint $table) {
$table->dropForeign(['schedule_id']);
$table->dropIndex(['schedule_id', 'sequence_id']);
$table->unique(['schedule_id', 'sequence_id'], 'tasks_schedule_sequence_unique');
$table->foreign('schedule_id')->references('id')->on('schedules')->onDelete('cascade');
});
}
public function down(): void
{
Schema::table('tasks', function (Blueprint $table) {
$table->dropForeign(['schedule_id']);
$table->dropUnique('tasks_schedule_sequence_unique');
$table->index(['schedule_id', 'sequence_id']);
$table->foreign('schedule_id')->references('id')->on('schedules')->onDelete('cascade');
});
}
};