Your project must not contain invalid function or method calls
- Read doc
- Reliability
- Major
More information: https://insight.symfony.com/what-we-analyse/php.invalid_call
- protected function handleOperationViaFiber(Closure $func): mixed
- {
- $fiber = new Fiber(callback: function (Closure $operation): void {
- $value = $operation();
- Fiber::suspend(value: $value);
- });
- try {
- $return = $fiber->start($func);
- } catch (Throwable $throwable) {
Your project must not contain invalid instantiations
- Read doc
- Reliability
- Major
More information: https://insight.symfony.com/what-we-analyse/php.invalid_instantiation
- $this->logger = $logger ?? new NullLogger();
- }
- protected function handleOperationViaFiber(Closure $func): mixed
- {
- $fiber = new Fiber(callback: function (Closure $operation): void {
- $value = $operation();
- Fiber::suspend(value: $value);
- });
Your project files should use safer permissions
- Read doc
- Security
- Major
More information: https://insight.symfony.com/what-we-analyse/php.too_permissive_file_permissions
chmod a-x 'src/EventListener/StopWorkerOnNextTaskSubscriber.php'
Your project should not contain duplicated code 151
- Read doc
- Productivity
- Minor
More information: https://insight.symfony.com/what-we-analyse/php.duplicated_code
- return $serializer;
- }
- abstract protected function buildConnection(): Connection;
- public function testConnectionCanListEmptyTasks(): void
- {
- $list = $this->connection->list();
- self::assertCount(0, $list);
- }
- public function testConnectionCanRetrieveASingleTask(): void
- {
- $this->connection->setup();
- $this->connection->create(task: new NullTask(name: 'foo'));
- $task = $this->connection->get(taskName: 'foo');
- self::assertInstanceOf(NullTask::class, $task);
- self::assertSame('foo', $task->getName());
- self::assertSame('* * * * *', $task->getExpression());
- }
- public function testMessengerTaskCanBeCreated(): void
- {
- $this->connection->setup();
- $this->connection->create(task: new MessengerTask(name: 'foo', message: new MessengerMessage()));
- $task = $this->connection->get(taskName: 'foo');
- self::assertInstanceOf(MessengerTask::class, $task);
- self::assertInstanceOf(MessengerMessage::class, $task->getMessage());
- self::assertSame('foo', $task->getName());
- self::assertSame('* * * * *', $task->getExpression());
- }
- /**
- * @throws Exception {@see Connection::setup()}
- */
- public function testTaskCanBePaused(): void
- {
- $this->connection->setup();
- $this->connection->create(new NullTask('foo'));
- $task = $this->connection->get('foo');
- }
- /**
- * @throws Exception {@see Connection::setup()}
- */
- public function testTaskCanBePaused(): void
- {
- $this->connection->setup();
- $this->connection->create(new NullTask('foo'));
- $task = $this->connection->get('foo');
- }
- /**
- * @throws Exception {@see Connection::setup()}
- */
- public function testTaskCanBeEnabled(): void
- {
- $this->connection->setup();
- $this->connection->create(new NullTask('foo'));
- $task = $this->connection->get('foo');
- self::assertInstanceOf(NullTask::class, $task);
- self::assertSame('foo', $task->getName());
- self::assertSame('* * * * *', $task->getExpression());
- self::assertSame(TaskInterface::ENABLED, $task->getState());
- $this->connection->pause('foo');
- $task = $this->connection->get('foo');
- self::assertInstanceOf(NullTask::class, $task);
- self::assertSame(TaskInterface::PAUSED, $task->getState());
- self::assertInstanceOf(NullTask::class, $task);
- self::assertSame('foo', $task->getName());
- self::assertSame('* * * * *', $task->getExpression());
- self::assertSame(TaskInterface::ENABLED, $task->getState());
- $this->connection->pause('foo');
- $task = $this->connection->get('foo');
- self::assertInstanceOf(NullTask::class, $task);
- self::assertSame(TaskInterface::PAUSED, $task->getState());
- /**
- * @author Guillaume Loulier <contact@guillaumeloulier.fr>
- */
- final class ConnectionTest extends TestCase
- {
- public function testConnectionCanReturnATaskList(): void
- {
- $serializer = $this->createMock(SerializerInterface::class);
- $queryBuilder = $this->getQueryBuilderMock();
- $queryBuilder->expects(self::exactly(2))->method('select')
- ;
- $queryBuilder->expects(self::once())->method('getSQL')->willReturn('SELECT * FROM _symfony_scheduler_tasks');
- $queryBuilder->expects(self::once())->method('getParameters')->willReturn([]);
- $queryBuilder->expects(self::once())->method('getParameterTypes')->willReturn([]);
- $statement = $this->createMock(class_exists(NextResult::class) ? NextResult::class : Result::class);
- $statement->expects(self::once())->method('fetchOne')->willReturn('0');
- $driverConnection = $this->getDBALConnectionMock();
- $driverConnection->expects(self::once())->method('createQueryBuilder')->willReturn($queryBuilder);
- $driverConnection->expects(self::once())->method('executeQuery')->willReturn($statement);
- ]));
- self::assertEmpty($connection->list());
- }
- public function testConnectionCannotReturnAnInvalidTask(): void
- {
- $serializer = $this->createMock(SerializerInterface::class);
- $expressionBuilder = $this->createMock(ExpressionBuilder::class);
- $expressionBuilder->expects(self::once())->method('eq')
- public function testConnectionCannotReturnAnInvalidTask(): void
- {
- $serializer = $this->createMock(SerializerInterface::class);
- $expressionBuilder = $this->createMock(ExpressionBuilder::class);
- $expressionBuilder->expects(self::once())->method('eq')
- ->with(self::equalTo('t.task_name'), self::equalTo(':name'))
- ->willReturn('t.task_name = :name')
- ;
- public function testConnectionCannotReturnAnInvalidTask(): void
- {
- $serializer = $this->createMock(SerializerInterface::class);
- $expressionBuilder = $this->createMock(ExpressionBuilder::class);
- $expressionBuilder->expects(self::once())->method('eq')
- ->with(self::equalTo('t.task_name'), self::equalTo(':name'))
- ->willReturn('t.task_name = :name')
- ;
- $queryBuilder->expects(self::once())->method('setParameter')->with(
- self::equalTo('name'),
- self::equalTo('foo'),
- self::equalTo(ParameterType::STRING)
- );
- $queryBuilder->expects(self::once())->method('getSQL')
- ->willReturn('SELECT * FROM _symfony_scheduler_tasks WHERE task_name = :name')
- ;
- $queryBuilder->expects(self::once())->method('getParameters')
- ->willReturn(['name' => 'foo'])
- ;
- self::equalTo(ParameterType::STRING)
- );
- $queryBuilder->expects(self::once())->method('getSQL')
- ->willReturn('SELECT * FROM _symfony_scheduler_tasks WHERE task_name = :name')
- ;
- $queryBuilder->expects(self::once())->method('getParameters')
- ->willReturn(['name' => 'foo'])
- ;
- $queryBuilder->expects(self::once())->method('getParameterTypes')
- ->willReturn(['name' => ParameterType::STRING])
- ;
- ->with(self::equalTo('t.task_name'), self::equalTo(':name'))
- ->willReturn('t.task_name = :name')
- ;
- $queryBuilder = $this->getQueryBuilderMock();
- $queryBuilder->expects(self::once())->method('expr')->willReturn($expressionBuilder);
- $queryBuilder->expects(self::exactly(2))->method('select')
- ->withConsecutive([self::equalTo('t.*')], [self::equalTo('COUNT(DISTINCT t.id)')])
- ->willReturnSelf()
- ;
- $queryBuilder->expects(self::once())->method('from')
- ->willReturn('t.task_name = :name')
- ;
- $queryBuilder = $this->getQueryBuilderMock();
- $queryBuilder->expects(self::once())->method('expr')->willReturn($expressionBuilder);
- $queryBuilder->expects(self::exactly(2))->method('select')
- ->withConsecutive([self::equalTo('t.*')], [self::equalTo('COUNT(DISTINCT t.id)')])
- ->willReturnSelf()
- ;
- $queryBuilder->expects(self::once())->method('from')
- ->with(self::equalTo('_symfony_scheduler_tasks'), self::equalTo('t'))
- $queryBuilder->expects(self::once())->method('setParameter')->with(
- self::equalTo('name'),
- self::equalTo('foo'),
- self::equalTo(ParameterType::STRING)
- )->willReturnSelf();
- $queryBuilder->expects(self::once())->method('getSQL')
- ->willReturn('SELECT * FROM _symfony_scheduler_tasks WHERE task_name = :name')
- ;
- $queryBuilder->expects(self::once())->method('getParameters')
- ->willReturn(['name' => 'foo'])
- ;
- public function testConnectionCannotInsertASingleTaskWithDuplicatedIdentifier(): void
- {
- $serializer = $this->createMock(SerializerInterface::class);
- $task = new NullTask('foo');
- $expressionBuilder = $this->createMock(ExpressionBuilder::class);
- $expressionBuilder->expects(self::once())->method('eq')
- ->with(self::equalTo('t.task_name'), self::equalTo(':name'))
- ->willReturn('t.task_name = :name')
- new FirstInFirstOutPolicy(),
- ]));
- $connection->create($task);
- }
- public function testConnectionCannotPauseATaskWithInvalidIdentifier(): void
- {
- $serializer = $this->createMock(SerializerInterface::class);
- $expressionBuilder = $this->createMock(ExpressionBuilder::class);
- $expressionBuilder->expects(self::once())->method('eq')
- ->willReturn('SELECT * FROM _symfony_scheduler_tasks WHERE task_name = :name')
- ;
- $queryBuilder->expects(self::once())->method('getParameters')
- ->willReturn(['name' => 'bar'])
- ;
- $queryBuilder->expects(self::once())->method('getParameterTypes')
- ->willReturn(['name' => ParameterType::STRING])
- ;
- $statement = $this->createMock(class_exists(NextResult::class) ? NextResult::class : Result::class);
- $statement->expects(self::once())->method('fetchOne')->willReturn('0');
- }
- public function testConnectionCanPauseASingleTask(): void
- {
- $task = $this->createMock(TaskInterface::class);
- $task->expects(self::once())->method('setState')->with(self::equalTo(TaskInterface::PAUSED));
- $serializer = $this->createMock(SerializerInterface::class);
- $serializer->expects(self::never())->method('serialize');
- $expressionBuilder = $this->createMock(ExpressionBuilder::class);
- $queryBuilder->expects(self::once())->method('expr')->willReturn($expressionBuilder);
- $queryBuilder->expects(self::exactly(2))->method('select')
- ->withConsecutive([self::equalTo('t.*')], [self::equalTo('COUNT(DISTINCT t.id)')])
- ->willReturnSelf()
- ;
- $queryBuilder->expects(self::once())->method('where')
- ->with(self::equalTo('t.task_name = :name'))
- ;
- $queryBuilder->expects(self::once())->method('setParameter')
- ->with(self::equalTo('name'), self::equalTo('foo'))
- ;
- */
- private function getDBALConnectionMock()
- {
- $platform = $this->createMock(AbstractPlatform::class);
- $platform->method('getWriteLockSQL')->willReturn('FOR UPDATE');
- $platform->method('getReadLockSQL')->willReturn('FOR UPDATE');
- $configuration = $this->createMock(Configuration::class);
- $driverConnection = $this->createMock(Connection::class);
- $driverConnection->method('getDatabasePlatform')->willReturn($platform);
- /**
- * @throws JsonException
- * @throws Throwable {@see TransportInterface::list()}
- */
- public function testTransportCanListTasks(): void
- {
- $serializer = $this->createMock(SerializerInterface::class);
- $queryBuilder = $this->createMock(QueryBuilder::class);
- $queryBuilder->expects(self::exactly(2))->method('select')
- self::assertInstanceOf(LazyTaskList::class, $list);
- self::assertCount(0, $list);
- }
- public function testTransportCanGetATask(): void
- {
- $task = $this->createMock(TaskInterface::class);
- $serializer = $this->createMock(SerializerInterface::class);
- $expressionBuilder = $this->createMock(ExpressionBuilder::class);
- }
- public function testTransportCanGetATask(): void
- {
- $task = $this->createMock(TaskInterface::class);
- $serializer = $this->createMock(SerializerInterface::class);
- $expressionBuilder = $this->createMock(ExpressionBuilder::class);
- $expressionBuilder->expects(self::once())->method('eq')
- ->with(self::equalTo('t.task_name'), self::equalTo(':name'))
- ->willReturn('t.task_name = :name')
- }
- public function testTransportCanGetATask(): void
- {
- $task = $this->createMock(TaskInterface::class);
- $serializer = $this->createMock(SerializerInterface::class);
- $expressionBuilder = $this->createMock(ExpressionBuilder::class);
- $expressionBuilder->expects(self::once())->method('eq')
- ->with(self::equalTo('t.task_name'), self::equalTo(':name'))
- ->willReturn('t.task_name = :name')
- public function testTransportCannotCreateAnExistingTask(): void
- {
- $serializer = $this->createMock(SerializerInterface::class);
- $task = new NullTask('foo');
- $expression = $this->createMock(ExpressionBuilder::class);
- $expression->expects(self::once())->method('eq')
- ->with(self::equalTo('t.task_name'), self::equalTo(':name'))
- ->willReturn('t.task_name = :name')
- ), $objectNormalizer,
- ], [new JsonEncoder()]);
- $objectNormalizer->setSerializer($serializer);
- try {
- $this->connection = new Connection(new InMemoryConfiguration([
- 'host' => $dsn->getHost(),
- 'password' => $dsn->getPassword(),
- 'port' => $dsn->getPort(),
- 'scheme' => $dsn->getScheme(),
- 'timeout' => $dsn->getOption('timeout', 30),
- self::expectExceptionMessage('The list is not initialized');
- self::expectExceptionCode(0);
- $connection->list();
- }
- public function testConnectionCanConnectWithoutSpecificPort(): void
- {
- $serializer = $this->createMock(SerializerInterface::class);
- $redis = $this->createMock(Redis::class);
- $redis->expects(self::once())->method('connect')->with(
- $connection->create($taskToCreate);
- }
- public function testConnectionCannotGetUndefinedTask(): void
- {
- $serializer = $this->createMock(SerializerInterface::class);
- $redis = $this->createMock(Redis::class);
- $redis->expects(self::once())->method('connect');
- $redis->expects(self::once())->method('select')->with(self::equalTo(0))->willReturn(true);
- $redis->expects(self::once())->method('auth')->willReturn(true);
- {
- $serializer = $this->createMock(SerializerInterface::class);
- $redis = $this->createMock(Redis::class);
- $redis->expects(self::once())->method('connect');
- $redis->expects(self::once())->method('select')->with(self::equalTo(0))->willReturn(true);
- $redis->expects(self::once())->method('auth')->willReturn(true);
- $redis->expects(self::once())->method('hExists')->willReturn(false);
- $connection = new Connection(new InMemoryConfiguration([
- 'host' => 'localhost',
- self::expectExceptionMessage('The task "foo" cannot be updated as it does not exist');
- self::expectExceptionCode(0);
- $connection->update('foo', $task);
- }
- public function testConnectionCannotUpdateWithError(): void
- {
- $serializer = $this->createMock(SerializerInterface::class);
- $serializer->expects(self::once())->method('serialize')->willReturn('foo');
- $task = $this->createMock(TaskInterface::class);
- $redis = $this->createMock(Redis::class);
- $redis->expects(self::once())->method('connect');
- $redis->expects(self::once())->method('select')->with(self::equalTo(0))->willReturn(true);
- $redis->expects(self::once())->method('auth')->willReturn(true);
- $redis->expects(self::once())->method('hExists')->willReturn(true);
- $redis->expects(self::once())->method('hSet')->willReturn(false);
- $redis->expects(self::once())->method('getLastError')->willReturn('Random error');
- $connection = new Connection(new InMemoryConfiguration([
- 'host' => 'localhost',
- 'timeout' => 30,
- $serializer = $this->createMock(SerializerInterface::class);
- $serializer->expects(self::once())->method('deserialize')->willReturn($task);
- $redis = $this->createMock(Redis::class);
- $redis->expects(self::once())->method('select')->with(self::equalTo(0))->willReturn(true);
- $redis->expects(self::once())->method('auth')->willReturn(true);
- $redis->expects(self::once())->method('hExists')->willReturn(true);
- $redis->expects(self::once())->method('hGet');
- $connection = new Connection(new InMemoryConfiguration([
- 'host' => 'localhost',
- self::expectExceptionMessage('The task "foo" is already paused');
- self::expectExceptionCode(0);
- $connection->pause('foo');
- }
- public function testConnectionCannotPauseWithUpdateException(): void
- {
- $task = $this->createMock(TaskInterface::class);
- $task->expects(self::once())->method('getState')->willReturn(TaskInterface::ENABLED);
- $task->expects(self::once())->method('setState')->with(self::equalTo(TaskInterface::PAUSED))->willReturnSelf();
- $task->expects(self::once())->method('getState')->willReturn(TaskInterface::ENABLED);
- $task->expects(self::once())->method('setState')->with(self::equalTo(TaskInterface::PAUSED))->willReturnSelf();
- $serializer = $this->createMock(SerializerInterface::class);
- $serializer->expects(self::once())->method('deserialize')->willReturn($task);
- $serializer->expects(self::once())->method('serialize')->with($task, 'json')->willReturn('foo');
- $redis = $this->createMock(Redis::class);
- $redis->expects(self::once())->method('auth')->willReturn(true);
- $redis->expects(self::once())->method('select')->with(self::equalTo(0))->willReturn(true);
- $redis->expects(self::exactly(2))->method('hExists')->willReturn(true);
- self::expectExceptionMessage('The task "foo" is already enabled');
- self::expectExceptionCode(0);
- $connection->resume('foo');
- }
- public function testConnectionCannotResumeTaskWithUpdateException(): void
- {
- $task = $this->createMock(TaskInterface::class);
- $task->expects(self::once())->method('getState')->willReturn(TaskInterface::PAUSED);
- $task->expects(self::once())->method('setState')->with(self::equalTo(TaskInterface::ENABLED))->willReturnSelf();
- }
- /**
- * @dataProvider provideList
- */
- public function testConnectionCannotEmptyWithException(string $list): void
- {
- $serializer = $this->createMock(SerializerInterface::class);
- $redis = $this->createMock(Redis::class);
- $redis->expects(self::once())->method('auth')->willReturn(true);
- public function testCommandCannotDisplayTaskOutputWithoutVeryVerbose(): void
- {
- $eventDispatcher = new EventDispatcher();
- $task = new NullTask('foo', [
- 'execution_memory_usage' => 9_507_552,
- ]);
- $scheduler = $this->createMock(SchedulerInterface::class);
- $scheduler->expects(self::exactly(2))->method('getDueTasks')->willReturn(new TaskList([$task]));
- $task = new NullTask('foo', [
- 'execution_memory_usage' => 9_507_552,
- ]);
- $scheduler = $this->createMock(SchedulerInterface::class);
- $scheduler->expects(self::exactly(2))->method('getDueTasks')->willReturn(new TaskList([$task]));
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $runner = $this->createMock(RunnerInterface::class);
- $task = new NullTask('foo', [
- 'execution_memory_usage' => 9_507_552,
- ]);
- $scheduler = $this->createMock(SchedulerInterface::class);
- $scheduler->expects(self::exactly(2))->method('getDueTasks')->willReturn(new TaskList([$task]));
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $runner = $this->createMock(RunnerInterface::class);
- }
- /**
- * @throws Throwable
- */
- public function testCommandCanExecuteDueTasksWithSpecificName(): void
- {
- $task = new NullTask('foo');
- $logger = $this->createMock(LoggerInterface::class);
- $eventDispatcher = $this->createMock(EventDispatcherInterface::class);
- }
- /**
- * @throws Throwable
- */
- public function testCommandCanExecuteDueTasksWithSpecificName(): void
- {
- $task = new NullTask('foo');
- $logger = $this->createMock(LoggerInterface::class);
- $eventDispatcher = $this->createMock(EventDispatcherInterface::class);
- }
- /**
- * @throws Throwable
- */
- public function testCommandCanExecuteMultipleDueTasksWithSpecificName(): void
- {
- $task = new NullTask('foo');
- $secondTask = new NullTask('bar');
- $logger = $this->createMock(LoggerInterface::class);
- * @throws Throwable
- */
- public function testCommandCanExecuteWholeTasksListWithSpecificExpression(): void
- {
- $task = new NullTask('foo');
- $eventDispatcher = $this->createMock(EventDispatcherInterface::class);
- $worker = $this->createMock(WorkerInterface::class);
- $worker->expects(self::once())->method('execute')->with(self::equalTo(WorkerConfiguration::create()), self::equalTo($task));
- $scheduler = $this->createMock(SchedulerInterface::class);
- $logger = $this->createMock(LoggerInterface::class);
- $eventDispatcher = $this->createMock(EventDispatcher::class);
- $eventDispatcher->expects(self::once())->method('addSubscriber')->with(new StopWorkerOnTaskLimitSubscriber(1, $logger));
- $task = $this->createMock(TaskInterface::class);
- $task->expects(self::exactly(3))->method('getName')->willReturn('foo');
- $task->expects(self::once())->method('getExpression')->willReturn('@reboot');
- $task->expects(self::once())->method('getState')->willReturn(TaskInterface::ENABLED);
- $task->expects(self::once())->method('getTags')->willReturn(['app', 'slow']);
- $logger = $this->createMock(LoggerInterface::class);
- $eventDispatcher = $this->createMock(EventDispatcher::class);
- $eventDispatcher->expects(self::once())->method('addSubscriber')->with(new StopWorkerOnTaskLimitSubscriber(1, $logger));
- $task = $this->createMock(TaskInterface::class);
- $task->expects(self::exactly(3))->method('getName')->willReturn('foo');
- $task->expects(self::once())->method('getExpression')->willReturn('@reboot');
- $task->expects(self::once())->method('getState')->willReturn(TaskInterface::ENABLED);
- $task->expects(self::once())->method('getTags')->willReturn(['app', 'slow']);
- $commandTester->execute([]);
- self::assertSame(Command::SUCCESS, $commandTester->getStatusCode());
- self::assertStringContainsString('[WARNING] The scheduler cannot be rebooted as the worker is not available', $commandTester->getDisplay());
- self::assertStringContainsString('The process will be retried as soon as the worker is available', $commandTester->getDisplay());
- self::assertStringContainsString('[OK] The scheduler have been rebooted', $commandTester->getDisplay());
- self::assertStringContainsString('Name', $commandTester->getDisplay());
- self::assertStringContainsString('foo', $commandTester->getDisplay());
- self::assertStringNotContainsString('bar', $commandTester->getDisplay());
- self::assertStringContainsString('Type', $commandTester->getDisplay());
- self::assertStringContainsString('State', $commandTester->getDisplay());
- self::assertSame(Command::FAILURE, $commandTester->getStatusCode());
- self::assertStringContainsString('[WARNING] The task "foo" has not been retried', $commandTester->getDisplay());
- }
- public function testCommandCanRetryTaskWithForceOption(): void
- {
- $logger = $this->createMock(LoggerInterface::class);
- $eventDispatcher = $this->createMock(EventDispatcherInterface::class);
- $eventDispatcher->expects(self::once())->method('dispatch')
- ],
- ],
- ],
- ]);
- self::assertArrayHasKey('probe', $configuration);
- self::assertTrue($configuration['probe']['enabled']);
- self::assertSame('/_foo', $configuration['probe']['path']);
- self::assertArrayHasKey('clients', $configuration['probe']);
- self::assertNotEmpty($configuration['probe']['clients']);
- self::assertArrayHasKey('bar.probe', $configuration['tasks']);
- self::assertSame('probe', $configuration['tasks']['bar.probe']['type']);
- self::assertSame('* * * * *', $configuration['tasks']['bar.probe']['expression']);
- }
- public function testConfigurationCanDefineProbeClientsWithExistingTasks(): void
- {
- $configuration = (new Processor())->processConfiguration(new SchedulerBundleConfiguration(), [
- 'scheduler_bundle' => [
- 'transport' => [
- 'dsn' => 'cache://memory',
- ],
- 'tasks' => [],
- 'lock_store' => null,
- ]);
- self::assertTrue($container->hasDefinition(Scheduler::class));
- self::assertTrue($container->hasAlias(SchedulerInterface::class));
- self::assertCount(5, $container->getDefinition(Scheduler::class)->getArguments());
- self::assertSame('Europe/Paris', $container->getDefinition(Scheduler::class)->getArgument(0));
- self::assertInstanceOf(Reference::class, $container->getDefinition(Scheduler::class)->getArgument(1));
- self::assertSame(TransportInterface::class, (string) $container->getDefinition(Scheduler::class)->getArgument(1));
- 'tasks' => [],
- 'lock_store' => null,
- ]);
- self::assertTrue($container->hasParameter('scheduler.scheduler_mode'));
- self::assertSame('lazy', $container->getParameter('scheduler.scheduler_mode'));
- self::assertTrue($container->hasDefinition(Scheduler::class));
- self::assertTrue($container->hasAlias(SchedulerInterface::class));
- self::assertCount(5, $container->getDefinition(Scheduler::class)->getArguments());
- self::assertSame('Europe/Paris', $container->getDefinition(Scheduler::class)->getArgument(0));
- ],
- 'tasks' => [],
- 'lock_store' => null,
- ]);
- self::assertTrue($container->hasDefinition(Worker::class));
- self::assertTrue($container->hasAlias(WorkerInterface::class));
- self::assertCount(8, $container->getDefinition(Worker::class)->getArguments());
- self::assertInstanceOf(Reference::class, $container->getDefinition(Worker::class)->getArgument(0));
- self::assertSame(SchedulerInterface::class, (string) $container->getDefinition(Worker::class)->getArgument(0));
- self::assertSame(ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE, $container->getDefinition(Worker::class)->getArgument(0)->getInvalidBehavior());
- self::assertSame(ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE, $container->getDefinition(Worker::class)->getArgument(6)->getInvalidBehavior());
- self::assertInstanceOf(Reference::class, $container->getDefinition(Worker::class)->getArgument(7));
- self::assertSame(LoggerInterface::class, (string) $container->getDefinition(Worker::class)->getArgument(7));
- self::assertSame(ContainerInterface::NULL_ON_INVALID_REFERENCE, $container->getDefinition(Worker::class)->getArgument(7)->getInvalidBehavior());
- self::assertFalse($container->getDefinition(Worker::class)->isPublic());
- self::assertCount(4, $container->getDefinition(Worker::class)->getTags());
- self::assertTrue($container->getDefinition(Worker::class)->hasTag('scheduler.worker'));
- self::assertTrue($container->getDefinition(Worker::class)->hasTag('monolog.logger'));
- self::assertSame('scheduler', $container->getDefinition(Worker::class)->getTag('monolog.logger')[0]['channel']);
- self::assertTrue($container->getDefinition(Worker::class)->hasTag('container.hot_path'));
- self::assertTrue($container->getDefinition(Worker::class)->hasTag('container.preload'));
- self::assertTrue($container->hasDefinition('scheduler.lock_store.factory'));
- self::assertSame(LockFactory::class, $container->getDefinition('scheduler.lock_store.factory')->getClass());
- self::assertFalse($container->getDefinition('scheduler.lock_store.factory')->isPublic());
- self::assertCount(1, $container->getDefinition('scheduler.lock_store.factory')->getArguments());
- self::assertInstanceOf(Reference::class, $container->getDefinition('scheduler.lock_store.factory')->getArgument('$store'));
- self::assertSame('scheduler.lock_store.store', (string) $container->getDefinition('scheduler.lock_store.factory')->getArgument('$store'));
- self::assertSame(ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE, $container->getDefinition('scheduler.lock_store.factory')->getArgument('$store')->getInvalidBehavior());
- self::assertCount(1, $container->getDefinition('scheduler.lock_store.factory')->getMethodCalls());
- self::assertSame('setLogger', $container->getDefinition('scheduler.lock_store.factory')->getMethodCalls()[0][0]);
- self::assertInstanceOf(Reference::class, $container->getDefinition('scheduler.lock_store.factory')->getMethodCalls()[0][1][0]);
- self::assertSame(LoggerInterface::class, (string) $container->getDefinition('scheduler.lock_store.factory')->getMethodCalls()[0][1][0]);
- 'expression' => '*/5 * * * *',
- 'description' => 'A simple cache clear task',
- 'options' => [
- 'env' => 'test',
- ],
- ], $container->getDefinition('scheduler._foo_task')->getArgument(0));
- self::assertFalse($container->getDefinition('scheduler._foo_task')->isPublic());
- $factory = $container->getDefinition('scheduler._foo_task')->getFactory();
- self::assertIsArray($factory);
- self::assertArrayHasKey(0, $factory);
- self::assertSame('scheduler.middleware', $container->getDefinition(MiddlewareRegistry::class)->getArgument(0)->getTag());
- self::assertCount(1, $container->getDefinition(MiddlewareRegistry::class)->getTags());
- self::assertTrue($container->getDefinition(MiddlewareRegistry::class)->hasTag('container.preload'));
- self::assertSame(MiddlewareRegistry::class, $container->getDefinition(MiddlewareRegistry::class)->getTag('container.preload')[0]['class']);
- self::assertTrue($container->hasAlias(SchedulerMiddlewareStackInterface::class));
- self::assertTrue($container->hasDefinition(SchedulerMiddlewareStack::class));
- self::assertFalse($container->getDefinition(SchedulerMiddlewareStack::class)->isPublic());
- self::assertCount(1, $container->getDefinition(SchedulerMiddlewareStack::class)->getArguments());
- self::assertInstanceOf(Reference::class, $container->getDefinition(SchedulerMiddlewareStack::class)->getArgument(0));
- self::assertSame(MiddlewareRegistryInterface::class, (string) $container->getDefinition(SchedulerMiddlewareStack::class)->getArgument(0));
- self::assertTrue($container->getDefinition(SchedulerMiddlewareStack::class)->hasTag('scheduler.middleware_hub'));
- self::assertTrue($container->getDefinition(SchedulerMiddlewareStack::class)->hasTag('container.hot_path'));
- self::assertTrue($container->getDefinition(SchedulerMiddlewareStack::class)->hasTag('container.preload'));
- self::assertSame(SchedulerMiddlewareStack::class, $container->getDefinition(SchedulerMiddlewareStack::class)->getTag('container.preload')[0]['class']);
- self::assertTrue($container->hasAlias(WorkerMiddlewareStackInterface::class));
- self::assertTrue($container->hasDefinition(WorkerMiddlewareStack::class));
- self::assertFalse($container->getDefinition(WorkerMiddlewareStack::class)->isPublic());
- self::assertCount(1, $container->getDefinition(WorkerMiddlewareStack::class)->getArguments());
- self::assertInstanceOf(Reference::class, $container->getDefinition(WorkerMiddlewareStack::class)->getArgument(0));
- self::assertSame(MiddlewareRegistryInterface::class, (string) $container->getDefinition(WorkerMiddlewareStack::class)->getArgument(0));
- self::assertArrayHasKey('task_filter', $request->attributes->all());
- self::assertSame('app.bar', $request->attributes->get('task_filter'));
- }
- public function testValidPathCanBeHandledWithValidExpression(): void
- {
- $objectNormalizer = new ObjectNormalizer(null, null, null, new PropertyInfoExtractor([], [new PhpDocExtractor(), new ReflectionExtractor()]));
- $notificationTaskBagNormalizer = new NotificationTaskBagNormalizer($objectNormalizer);
- $lockTaskBagNormalizer = new AccessLockBagNormalizer($objectNormalizer);
- $workerLifecycleSubscriber = new WorkerLifecycleSubscriber($logger);
- $workerLifecycleSubscriber->onWorkerRestarted(new WorkerRestartedEvent($worker));
- }
- public function testSubscriberLogOnWorkerRestarted(): void
- {
- $task = $this->createMock(TaskInterface::class);
- $task->expects(self::once())->method('getName')->willReturn('foo');
- $worker = $this->createMock(WorkerInterface::class);
- /**
- * @throws Exception {@see Scheduler::__construct()}
- * @throws Throwable {@see SchedulerInterface::schedule()}
- */
- public function testSchedulerCanScheduleTasksWithBeforeSchedulingNotificationAndWithNotifier(): void
- {
- $notification = $this->createMock(Notification::class);
- $recipient = $this->createMock(Recipient::class);
- $notifier = $this->createMock(NotifierInterface::class);
- 'execution_mode' => 'first_in_first_out',
- ]), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- ])),
- new SchedulerMiddlewareStack(),
- new EventDispatcher()
- ));
- $scheduler->schedule($task);
- $scheduler->schedule($secondTask);
- $scheduler->schedule($thirdTask);
- 'execution_mode' => 'first_in_first_out',
- ]), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- ])),
- new SchedulerMiddlewareStack(),
- new EventDispatcher()
- ));
- $scheduler->schedule($task);
- $scheduler->schedule($secondTask);
- $scheduler->schedule($thirdTask);
- 'execution_mode' => 'first_in_first_out',
- ]), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- ])),
- new SchedulerMiddlewareStack(),
- new EventDispatcher()
- ));
- $scheduler->schedule($task);
- $scheduler->schedule($secondTask);
- $scheduler->schedule($thirdTask);
- {
- $scheduler = new FiberScheduler(new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration([
- 'execution_mode' => 'first_in_first_out',
- ]), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- ])), new SchedulerMiddlewareStack(), new EventDispatcher()));
- $scheduler->schedule($task);
- self::assertInstanceOf(LazyTaskList::class, $scheduler->getTasks(true));
- self::assertCount(1, $scheduler->getTasks(true));
- $updatedTask = new NullTask('bar');
- $bus = $this->createMock(MessageBusInterface::class);
- $bus->expects(self::never())->method('dispatch');
- $scheduler = new FiberScheduler(new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- ])), new SchedulerMiddlewareStack(new MiddlewareRegistry([])), new EventDispatcher(), $bus));
- $scheduler->schedule($task);
- self::assertCount(1, $scheduler->getTasks());
- new TaskToUpdateMessageHandler($transport),
- ],
- ])),
- ]);
- $scheduler = new FiberScheduler(new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(new MiddlewareRegistry([])), new EventDispatcher(), $bus));
- $scheduler->schedule($task);
- self::assertCount(1, $scheduler->getTasks());
- self::assertSame('* * * * *', $scheduler->getTasks()->get('foo')->getExpression());
- public function testTaskCanBeUpdatedThenRetrieved(TaskInterface $task): void
- {
- $bus = $this->createMock(MessageBusInterface::class);
- $bus->expects(self::never())->method('dispatch');
- $scheduler = new FiberScheduler(new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration([
- 'execution_mode' => 'first_in_first_out',
- ]), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- ])), new SchedulerMiddlewareStack(new MiddlewareRegistry([])), new EventDispatcher(), $bus));
- public function testTaskCanBeUpdatedThenLazilyRetrieved(TaskInterface $task): void
- {
- $bus = $this->createMock(MessageBusInterface::class);
- $bus->expects(self::never())->method('dispatch');
- $scheduler = new FiberScheduler(new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration([
- 'execution_mode' => 'first_in_first_out',
- ]), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- ])), new SchedulerMiddlewareStack(new MiddlewareRegistry([])), new EventDispatcher(), $bus));
- $scheduler = new FiberScheduler(new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration([
- 'execution_mode' => 'first_in_first_out',
- ]), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- ])), new SchedulerMiddlewareStack([]), new EventDispatcher()));
- $scheduler->schedule(new NullTask('foo'));
- $list = $scheduler->getTasks();
- self::assertCount(1, $list);
- $worker = new Worker($scheduler, new RunnerRegistry([
- new NullTaskRunner(),
- ]), new ExecutionPolicyRegistry([
- new DefaultPolicy(),
- ]), new TaskExecutionTracker(new Stopwatch()), new WorkerMiddlewareStack([
- new SingleRunTaskMiddleware($transport),
- new TaskUpdateMiddleware($transport),
- new TaskLockBagMiddleware($lockFactory),
- ]), $eventDispatcher, $lockFactory, $logger);
- $worker->execute(WorkerConfiguration::create());
- self::assertFalse($scheduler->isInitialized());
- $scheduler->schedule(new NullTask('foo'));
- $scheduler->schedule(new NullTask('bar'));
- $scheduler->schedule(new NullTask('reboot'));
- $scheduler->preempt('foo', static fn (TaskInterface $task): bool => $task->getName() === 'reboot');
- $lockFactory = new LockFactory(new InMemoryStore());
- $worker = new Worker($scheduler, new RunnerRegistry([
- new NullTaskRunner(),
- $schedulerMiddlewareStack = new FiberAwareSchedulerMiddlewareStack(new SchedulerMiddlewareStack(new MiddlewareRegistry([
- $middleware,
- $secondMiddleware,
- ])), $logger);
- $schedulerMiddlewareStack->runPreSchedulingMiddleware($task, $scheduler);
- }
- /**
- * @throws Throwable {@see SchedulerMiddlewareStack::runPreSchedulingMiddleware()}
- */
- $secondMiddleware,
- $thirdMiddleware,
- $fourthMiddleware,
- ])), $logger);
- self::expectException(RuntimeException::class);
- self::expectExceptionMessage('An error occurred');
- self::expectExceptionCode(0);
- $schedulerMiddlewareStack->runPreSchedulingMiddleware($task, $scheduler);
- }
- $thirdOrderedMiddleware->expects(self::exactly(2))->method('getPriority')->willReturn(0);
- $thirdOrderedMiddleware->expects(self::once())->method('preScheduling')->with(self::equalTo($task), self::equalTo($scheduler));
- $fourthOrderedMiddleware = $this->createMock(OrderedMiddleware::class);
- $fourthOrderedMiddleware->expects(self::exactly(2))->method('getPriority')->willReturn(1);
- $fourthOrderedMiddleware->expects(self::once())->method('preScheduling')->with(self::equalTo($task), self::equalTo($scheduler));
- $middleware = $this->createMock(PreSchedulingMiddlewareInterface::class);
- $middleware->expects(self::once())->method('preScheduling')->with(self::equalTo($task), self::equalTo($scheduler));
- $secondMiddleware = $this->createMock(PostSchedulingMiddlewareInterface::class);
- $secondOrderedMiddleware,
- $thirdOrderedMiddleware,
- $fourthOrderedMiddleware,
- ])), $logger);
- $schedulerMiddlewareStack->runPreSchedulingMiddleware($task, $scheduler);
- }
- /**
- * @throws Throwable {@see SchedulerMiddlewareStack::runPreSchedulingMiddleware()}
- */
- self::assertSame(Output::ERROR, $output->getType());
- self::assertNull($output->getOutput());
- self::assertInstanceOf(ShellTask::class, $output->getTask());
- }
- public function testApplicationCanReturnValidCode(): void
- {
- $worker = $this->createMock(WorkerInterface::class);
- $commandTask = new CommandTask('foo', 'app:foo');
- self::assertSame('The probe state is invalid', $output->getOutput());
- self::assertInstanceOf(ProbeTask::class, $output->getTask());
- self::assertNull($output->getTask()->getExecutionState());
- }
- public function testRunnerCanRunTaskWithInvalidProbeStateAndTaskFailureEnabled(): void
- {
- $worker = $this->createMock(WorkerInterface::class);
- $response = $this->createMock(ResponseInterface::class);
- $response->expects(self::once())->method('toArray')->with(self::equalTo(true))->willReturn([
- /**
- * @throws Exception {@see Scheduler::__construct()}
- * @throws Throwable {@see SchedulerInterface::schedule()}
- */
- public function testSchedulerCanScheduleTasksWithBeforeSchedulingNotificationAndWithNotifier(): void
- {
- $notification = $this->createMock(Notification::class);
- $recipient = $this->createMock(Recipient::class);
- $notifier = $this->createMock(NotifierInterface::class);
- self::assertFalse($normalizer->supportsNormalization(new stdClass()));
- self::assertTrue($normalizer->supportsNormalization(new SchedulerConfiguration(new DateTimeZone('UTC'), new DateTimeImmutable(), new NullTask('foo'))));
- }
- public function testNormalizerCanNormalize(): void
- {
- $objectNormalizer = new ObjectNormalizer(null, null, null, new PropertyInfoExtractor([], [new PhpDocExtractor(), new ReflectionExtractor()]));
- $notificationTaskBagNormalizer = new NotificationTaskBagNormalizer($objectNormalizer);
- $lockTaskBagNormalizer = new AccessLockBagNormalizer($objectNormalizer);
- ]));
- self::assertSame('nice', $cacheTransport->getExecutionMode());
- }
- public function testTransportCannotReturnUndefinedTask(): void
- {
- $objectNormalizer = new ObjectNormalizer(null, null, null, new PropertyInfoExtractor([], [new PhpDocExtractor(), new ReflectionExtractor()]));
- $lockTaskBagNormalizer = new AccessLockBagNormalizer($objectNormalizer);
- $serializer = new Serializer([
- $cacheTransport->update('foo', new ShellTask('foo', []));
- self::assertInstanceOf(ShellTask::class, $cacheTransport->get('foo'));
- }
- public function testTransportCannotPauseAlreadyPausedTask(): void
- {
- $objectNormalizer = new ObjectNormalizer(null, null, null, new PropertyInfoExtractor([], [new PhpDocExtractor(), new ReflectionExtractor()]));
- $lockTaskBagNormalizer = new AccessLockBagNormalizer($objectNormalizer);
- $serializer = new Serializer([
- $cacheTransport->update('foo', new ShellTask('foo', []));
- self::assertInstanceOf(ShellTask::class, $cacheTransport->get('foo'));
- }
- public function testTransportCannotPauseAlreadyPausedTask(): void
- {
- $objectNormalizer = new ObjectNormalizer(null, null, null, new PropertyInfoExtractor([], [new PhpDocExtractor(), new ReflectionExtractor()]));
- $lockTaskBagNormalizer = new AccessLockBagNormalizer($objectNormalizer);
- $serializer = new Serializer([
- }
- /**
- * @throws Throwable {@see TransportInterface::list()}
- */
- public function testTransportCannotDeleteUndefinedTask(): void
- {
- $objectNormalizer = new ObjectNormalizer(null, null, null, new PropertyInfoExtractor([], [new PhpDocExtractor(), new ReflectionExtractor()]));
- $lockTaskBagNormalizer = new AccessLockBagNormalizer($objectNormalizer);
- $serializer = new Serializer([
- self::expectExceptionMessage('All the transports failed to execute the requested action');
- self::expectExceptionCode(0);
- $failOverTransport->get('foo');
- }
- public function testTransportCanRetrieveTask(): void
- {
- $firstTransport = $this->createMock(TransportInterface::class);
- $firstTransport->expects(self::once())->method('get')
- ->with(self::equalTo('foo'), self::equalTo(false))
- ->willThrowException(new RuntimeException('Task not found'))
- ]), new InMemoryConfiguration());
- self::expectException(TransportException::class);
- self::expectExceptionMessage('All the transports failed to execute the requested action');
- self::expectExceptionCode(0);
- $failOverTransport->update('foo', $task);
- }
- public function testTransportCanUpdateTask(): void
- {
- $task = $this->createMock(TaskInterface::class);
- self::expectException(InvalidArgumentException::class);
- self::expectExceptionMessage('The "bar" task does not exist');
- $filesystemTransport->get('bar');
- }
- public function testTaskCanBeRetrieved(): void
- {
- $objectNormalizer = new ObjectNormalizer(null, null, null, new PropertyInfoExtractor([], [new PhpDocExtractor(), new ReflectionExtractor()]));
- $lockTaskBagNormalizer = new AccessLockBagNormalizer($objectNormalizer);
- $serializer = new Serializer([
- self::expectException(InvalidArgumentException::class);
- self::expectExceptionMessage('The "bar" task does not exist');
- $filesystemTransport->get('bar');
- }
- public function testTaskCanBeRetrieved(): void
- {
- $objectNormalizer = new ObjectNormalizer(null, null, null, new PropertyInfoExtractor([], [new PhpDocExtractor(), new ReflectionExtractor()]));
- $lockTaskBagNormalizer = new AccessLockBagNormalizer($objectNormalizer);
- $serializer = new Serializer([
- self::expectException(InvalidArgumentException::class);
- self::expectExceptionMessage('The "bar" task does not exist');
- $filesystemTransport->get('bar');
- }
- public function testTaskCanBeRetrieved(): void
- {
- $objectNormalizer = new ObjectNormalizer(null, null, null, new PropertyInfoExtractor([], [new PhpDocExtractor(), new ReflectionExtractor()]));
- $lockTaskBagNormalizer = new AccessLockBagNormalizer($objectNormalizer);
- $serializer = new Serializer([
- self::expectException(InvalidArgumentException::class);
- self::expectExceptionMessage('The "bar" task does not exist');
- $filesystemTransport->get('bar');
- }
- public function testTaskCanBeRetrieved(): void
- {
- $objectNormalizer = new ObjectNormalizer(null, null, null, new PropertyInfoExtractor([], [new PhpDocExtractor(), new ReflectionExtractor()]));
- $lockTaskBagNormalizer = new AccessLockBagNormalizer($objectNormalizer);
- $serializer = new Serializer([
- self::expectException(InvalidArgumentException::class);
- self::expectExceptionMessage('The "bar" task does not exist');
- $filesystemTransport->get('bar');
- }
- public function testTaskCanBeRetrieved(): void
- {
- $objectNormalizer = new ObjectNormalizer(null, null, null, new PropertyInfoExtractor([], [new PhpDocExtractor(), new ReflectionExtractor()]));
- $lockTaskBagNormalizer = new AccessLockBagNormalizer($objectNormalizer);
- $serializer = new Serializer([
- }
- /**
- * @throws Throwable {@see TransportInterface::list()}
- */
- public function testTransportCannotReturnAListWithFailingTransports(): void
- {
- $secondTaskList = new TaskList([
- new NullTask('foo'),
- ]);
- new TaskLockBagMiddleware($lockFactory),
- ]), $eventDispatcher, $lockFactory, $logger);
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $configuration->stop();
- $worker->execute($configuration);
- self::assertSame(0, $worker->getConfiguration()->getExecutedTasksCount());
- self::assertNull($worker->getConfiguration()->getForkedFrom());
- new TaskLockBagMiddleware($lockFactory),
- ]), $eventDispatcher, $lockFactory, new NullLogger());
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $configuration->setSleepDurationDelay(5);
- $worker->execute($configuration);
- $forkedWorker = $worker->fork();
- self::assertNotSame($forkedWorker, $worker);
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertNull($worker->getLastExecutedTask());
- self::assertCount(1, $worker->getFailedTasks());
- $failedTask = $worker->getFailedTasks()->get('foo.failed');
- self::assertInstanceOf(FailedTask::class, $failedTask);
- new TaskLockBagMiddleware($lockFactory),
- ]), $eventDispatcher, $lockFactory, $logger);
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $configuration->stop();
- $worker->execute($configuration);
- self::assertNull($worker->getLastExecutedTask());
- self::assertTrue($worker->getConfiguration()->shouldStop());
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertNull($task->getExecutionState());
- self::assertSame(TaskInterface::SUCCEED, $secondTask->getExecutionState());
- self::assertSame($secondTask, $worker->getLastExecutedTask());
- }
- /**
- {
- $task = new NullTask('foo', [
- 'execution_delay' => 1_000_000,
- ]);
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects(self::never())->method('info');
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $tracker->expects(self::once())->method('startTracking');
- $tracker->expects(self::once())->method('endTracking');
- {
- $task = new NullTask('foo', [
- 'execution_delay' => 1_000_000,
- ]);
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects(self::never())->method('info');
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $tracker->expects(self::once())->method('startTracking');
- $tracker->expects(self::once())->method('endTracking');
- ]);
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects(self::never())->method('info');
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $tracker->expects(self::once())->method('startTracking');
- $tracker->expects(self::once())->method('endTracking');
- $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- ]);
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects(self::never())->method('info');
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $tracker->expects(self::once())->method('startTracking');
- $tracker->expects(self::once())->method('endTracking');
- $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame($task, $worker->getLastExecutedTask());
- }
- /**
- * @throws Throwable
- */
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects(self::never())->method('info');
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $tracker->expects(self::never())->method('startTracking')->with(self::equalTo($task));
- $tracker->expects(self::never())->method('endTracking')->with(self::equalTo($task));
- $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- ]));
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $tracker->expects(self::never())->method('startTracking')->with(self::equalTo($task));
- $tracker->expects(self::never())->method('endTracking')->with(self::equalTo($task));
- $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- ]));
- $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher());
- $scheduler->schedule($task);
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertNull($worker->getLastExecutedTask());
- self::assertCount(1, $worker->getFailedTasks());
- }
- /**
- * @throws Throwable {@see TaskListInterface::add()}
- $scheduler->schedule($validTask);
- $eventDispatcher = new EventDispatcher();
- $eventDispatcher->addSubscriber(new StopWorkerOnTaskLimitSubscriber(2));
- $lockFactory = new LockFactory(new InMemoryStore());
- $worker = new Worker($scheduler, new RunnerRegistry([
- new NullTaskRunner(),
- ]), new ExecutionPolicyRegistry([
- new FiberPolicy(),
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertCount(1, $worker->getFailedTasks());
- self::assertInstanceOf(FailedTask::class, $worker->getFailedTasks()->get('foo.failed'));
- self::assertNotNull($worker->getLastExecutedTask());
- self::assertSame($validTask, $worker->getLastExecutedTask());
- }
- {
- $task = new NullTask('foo', [
- 'before_executing' => static fn (): bool => true,
- ]);
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects(self::never())->method('info');
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($task));
- $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($task));
- {
- $task = new NullTask('foo', [
- 'before_executing' => static fn (): bool => true,
- ]);
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects(self::never())->method('info');
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($task));
- $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($task));
- {
- $task = new NullTask('foo', [
- 'before_executing' => static fn (): bool => true,
- ]);
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects(self::never())->method('info');
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($task));
- $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($task));
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame($task, $worker->getLastExecutedTask());
- self::assertCount(0, $worker->getFailedTasks());
- }
- /**
- * @throws Throwable {@see TaskListInterface::add()}
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertCount(1, $worker->getFailedTasks());
- self::assertInstanceOf(FailedTask::class, $worker->getFailedTasks()->get('foo.failed'));
- self::assertNotNull($worker->getLastExecutedTask());
- self::assertSame($validTask, $worker->getLastExecutedTask());
- }
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertCount(0, $worker->getFailedTasks());
- self::assertSame($task, $worker->getLastExecutedTask());
- }
- /**
- * @throws Throwable {@see TaskListInterface::add()}
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame(1, $worker->getConfiguration()->getExecutedTasksCount());
- self::assertInstanceOf(NullTask::class, $worker->getLastExecutedTask());
- $task = $scheduler->getTasks()->get('foo');
- self::assertFalse($task->isSingleRun());
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame(0, $worker->getConfiguration()->getExecutedTasksCount());
- self::assertNull($worker->getLastExecutedTask());
- }
- /**
- * @throws Throwable {@see TaskListInterface::add()}
- }
- /**
- * @throws Throwable {@see TaskListInterface::add()}
- */
- public function testTaskCanBeExecutedAndTheWorkerCanReturnTheLastExecutedTask(): void
- {
- $task = new NullTask('foo');
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects(self::never())->method('info');
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame($task, $worker->getLastExecutedTask());
- }
- /**
- * @throws Throwable {@see TaskListInterface::add()}
- */
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame($shellTask, $worker->getLastExecutedTask());
- }
- /**
- * @throws Throwable {@see TaskListInterface::add()}
- */
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- $failedTasks = $worker->getFailedTasks();
- self::assertCount(1, $failedTasks);
- $failedTask = $failedTasks->get('foo.failed');
- self::assertInstanceOf(FailedTask::class, $failedTask);
- self::assertNotEmpty($worker->getFailedTasks());
- }
- /**
- * @throws Throwable {@see TaskListInterface::add()}
- */
- public function testTaskCanBeExecutedWithoutBeforeExecutionNotificationAndNotifier(): void
- {
- $task = new NullTask('foo');
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects(self::never())->method('info');
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame($task, $worker->getLastExecutedTask());
- self::assertCount(0, $worker->getFailedTasks());
- }
- /**
- * @throws Throwable {@see TaskListInterface::add()}
- ]);
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects(self::never())->method('info');
- $notifier = $this->createMock(NotifierInterface::class);
- $notifier->expects(self::once())->method('send')->with(self::equalTo($notification), $recipient);
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $tracker->expects(self::once())->method('startTracking');
- $tracker->expects(self::once())->method('endTracking');
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame($task, $worker->getLastExecutedTask());
- self::assertCount(0, $worker->getFailedTasks());
- }
- /**
- * @throws Throwable
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame($task, $worker->getLastExecutedTask());
- self::assertCount(0, $worker->getFailedTasks());
- }
- /**
- * @throws Throwable
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame($task, $worker->getLastExecutedTask());
- self::assertCount(0, $worker->getFailedTasks());
- }
- /**
- * @throws Throwable
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame($task, $worker->getLastExecutedTask());
- }
- /**
- * @throws Throwable
- */
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame($task, $worker->getLastExecutedTask());
- }
- /**
- * @throws Throwable
- */
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertNull($worker->getLastExecutedTask());
- self::assertCount(1, $worker->getFailedTasks());
- $failedTask = $worker->getFailedTasks()->get('foo.failed');
- self::assertInstanceOf(FailedTask::class, $failedTask);
- self::assertSame($task, $failedTask->getTask());
- new TaskLockBagMiddleware($lockFactory),
- ]), $eventDispatcher, $lockFactory, $logger);
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration, $task);
- self::assertCount(0, $worker->getFailedTasks());
- self::assertSame($task, $worker->getLastExecutedTask());
- }
- new TaskLockBagMiddleware($lockFactory),
- ]), $eventDispatcher, $lockFactory, $logger);
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration, $task);
- self::assertCount(1, $worker->getFailedTasks());
- self::assertNull($worker->getLastExecutedTask());
- }
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertNull($worker->getLastExecutedTask());
- }
- /**
- * @throws Throwable
- */
- }
- /**
- * @throws Throwable
- */
- public function testWorkerCanExecuteChainedTasks(): void
- {
- $chainedTask = new ChainedTask(
- 'foo',
- new ShellTask('chained_foo', ['ls', '-al']),
- new ShellTask('chained_bar', ['ls', '-al'])
- new TaskLockBagMiddleware($lockFactory),
- ]), $eventDispatcher, $lockFactory, $logger);
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame($shellTask, $worker->getLastExecutedTask());
- self::assertSame(TaskInterface::SUCCEED, $chainedTask->getExecutionState());
- self::assertNotNull($chainedTask->getExecutionStartTime());
- self::assertNotNull($chainedTask->getExecutionEndTime());
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame($shellTask, $worker->getLastExecutedTask());
- self::assertSame(TaskInterface::SUCCEED, $chainedTask->getExecutionState());
- self::assertNotNull($chainedTask->getExecutionStartTime());
- self::assertNotNull($chainedTask->getExecutionEndTime());
- $chainedFooTask = $chainedTask->getTask('chained_foo');
- new TaskLockBagMiddleware($lockFactory),
- ]), $eventDispatcher, $lockFactory, $logger);
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $configuration->mustRetrieveTasksLazily(true);
- $worker->execute($configuration);
- self::assertSame($shellTask, $worker->getLastExecutedTask());
- self::assertSame(TaskInterface::SUCCEED, $chainedTask->getExecutionState());
- self::assertNotNull($chainedTask->getExecutionStartTime());
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertSame($task, $worker->getLastExecutedTask());
- self::assertNull($task->getAccessLockBag());
- self::assertFalse($worker->getConfiguration()->isRunning());
- self::assertSame(1, $worker->getConfiguration()->getExecutedTasksCount());
- }
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertCount(0, $worker->getFailedTasks());
- self::assertSame($task, $worker->getLastExecutedTask());
- self::assertFalse($worker->getConfiguration()->isRunning());
- self::assertSame(1, $worker->getConfiguration()->getExecutedTasksCount());
- }
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertCount(0, $worker->getFailedTasks());
- self::assertNull($worker->getLastExecutedTask());
- self::assertTrue($worker->getConfiguration()->shouldStop());
- self::assertFalse($worker->getConfiguration()->isRunning());
- self::assertSame(0, $worker->getConfiguration()->getExecutedTasksCount());
- }
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertCount(1, $worker->getFailedTasks());
- self::assertNull($worker->getLastExecutedTask());
- self::assertFalse($worker->getConfiguration()->isRunning());
- self::assertSame(0, $worker->getConfiguration()->getExecutedTasksCount());
- }
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertCount(0, $worker->getFailedTasks());
- self::assertFalse($worker->getConfiguration()->isRunning());
- self::assertNull($worker->getLastExecutedTask());
- self::assertSame(0, $worker->getConfiguration()->getExecutedTasksCount());
- self::assertSame(0, $worker->getConfiguration()->getExecutedTasksCount());
- }
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertCount(0, $worker->getFailedTasks());
- self::assertSame(1, $worker->getConfiguration()->getExecutedTasksCount());
- self::assertFalse($worker->isRunning());
- }
- /**
- $configuration = WorkerConfiguration::create();
- $configuration->setExecutionPolicy('fiber');
- $worker->execute($configuration);
- self::assertFalse($worker->isRunning());
- self::assertCount(0, $worker->getFailedTasks());
- self::assertTrue($worker->getConfiguration()->shouldStop());
- }
- /**
- }
- /**
- * @throws Throwable {@see WorkerInterface::preempt()}
- */
- public function testWorkerCanPreempt(): void
- {
- $logger = $this->createMock(LoggerInterface::class);
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([
- new TaskUpdateMiddleware($transport),
- new TaskLockBagMiddleware($lockFactory),
- ]), new EventDispatcher(), $lockFactory, $logger);
- $worker->getConfiguration()->setExecutionPolicy('fiber');
- $barTask = new NullTask('bar');
- $randomTask = new NullTask('random');
- $preemptList = new TaskList([
- new NullTask('foo'),
- $barTask,
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects(self::never())->method('info');
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $tracker->expects(self::never())->method('startTracking')->with(self::equalTo($task));
- $tracker->expects(self::never())->method('endTracking')->with(self::equalTo($task));
- $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- ]));
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $tracker->expects(self::never())->method('startTracking')->with(self::equalTo($task));
- $tracker->expects(self::never())->method('endTracking')->with(self::equalTo($task));
- $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([
- new FirstInFirstOutPolicy(),
- ]));
- $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher());
- $scheduler->schedule($task);
- $scheduler->schedule($validTask);
- $eventDispatcher = new EventDispatcher();
- $eventDispatcher->addSubscriber(new StopWorkerOnTaskLimitSubscriber(2));
- $lockFactory = new LockFactory(new InMemoryStore());
- $worker = new Worker($scheduler, new RunnerRegistry([
- new NullTaskRunner(),
- ]), new ExecutionPolicyRegistry([
- new DefaultPolicy(),
- }
- /**
- * @throws Throwable {@see WorkerInterface::preempt()}
- */
- public function testWorkerCanPreempt(): void
- {
- $logger = $this->createMock(LoggerInterface::class);
- $tracker = $this->createMock(TaskExecutionTrackerInterface::class);
- $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([