Orleans long-running task

Orleans is aktor model framework for building concurrent and scalable distributed applications. Grains (aka aktors) communicate with each other via messages. And it’s quite important to process a message within certain ResponseTimeout. If a message is not processed within timeout, Orleans throws timeout exception.

But what if grain needs to perform long-running tasks that can’t be complete within a defined timeframe? Let’s dive into the solution approach.

Setup

This implementation is based on the following technology stacks:

If you are not familiar with Orleans, you can start from the documentation: https://dotnet.github.io/orleans/.

Long-running tasks in grains

Orleans engine expects that response to the grain message is robust enough. And it’s regulated with ResponseTimeout configuration, which is global on the Silo / Client level.

Inspirations

There are some discussion and suggestion of how to implement long-running tasks:

And there is still an open issue Create Long-Running Task Orchestration Sample.

To overcome this limitation tasks can be offloaded as background tasks into the system task scheduler.

In the following implementation the grain BackgroundWorkload contains the abstract method ProcessAsync which return long-running task. ProcessAsync should be overloaded in a child’s class. When ProcessAsync task completed or failed task thread sends respond message back to the BackgroundWorkload grain.

The following snippet is a base class for a long-running workload.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
    public abstract class BackgroundWorkload<TRequest, TResponse> : Grain, IBackgroundWorkload<TRequest, TResponse>
    {
        private IResult _result = new None();
        private Task? _task;

        public Task<bool> StartAsync(TRequest request)
        {
            return StartAsync(request, CancellationToken.None);
        }

        public Task<bool> StartAsync(TRequest request, GrainCancellationToken cancellationToken)
        {
            return StartAsync(request, cancellationToken.CancellationToken);
        }

        // Start long-running task
        private Task<bool> StartAsync(TRequest request, CancellationToken cancellationToken)
        {
            if (_task != null)
            {
                return Task.FromResult(false);
            }

            _result = new Started();
            _task = CreateTask(request, cancellationToken, TaskScheduler.Current);

            return Task.FromResult(true);
        }

        // Run task in the default task scheduler
        private Task CreateTask(TRequest request, CancellationToken cancellationToken, TaskScheduler orleansTaskScheduler) =>
            Task.Run(async () =>
            {
                try
                {
                    var response = await ProcessAsync(request, cancellationToken);
                    // After workflow complete send grain message CompleteAsync
                    await InvokeGrainAsync(orleansTaskScheduler, grain => grain.CompleteAsync(response));
                }
                catch (Exception exception)
                {
                    // After workflow failed send grain message FailedAsync
                    await InvokeGrainAsync(orleansTaskScheduler, grain => grain.FailedAsync(exception));
                }
            });

        // Execute task on the Grain task scheduler
        private Task InvokeGrainAsync(TaskScheduler orleansTaskScheduler, Func<IBackgroundWorkload<TRequest, TResponse>, Task> action) =>
            Task.Factory.StartNew(async () =>
            {
                var grain = GrainFactory.GetGrain<IBackgroundWorkload<TRequest, TResponse>>(this.GetPrimaryKeyString());
                await action(grain);
            }, CancellationToken.None, TaskCreationOptions.None, orleansTaskScheduler);

        // Stores long-running task result to the grain state
        public Task CompleteAsync(TResponse response)
        {
            if (!(_result is Started))
            {
                return Task.CompletedTask;
            }

            _task = null;
            _result = new Completed<TResponse>(response);
            return Task.CompletedTask;
        }

        // Stores long-running task fail result to the grain state
        public Task FailedAsync(Exception exception)
        {
            if (!(_result is Started))
            {
                return Task.CompletedTask;
            }

            _task = null;
            _result = new Failed(exception);
            return Task.CompletedTask;
        }

        // Returns current grain state
        public Task<IResult> GetResultAsync()
        {
            return Task.FromResult(_result);
        }

        // Long-running task process
        protected abstract Task<TResponse> ProcessAsync(TRequest request, CancellationToken cancellationToken);
    }

Implementation details

Orleans can instantiate grain instances on different physical machines. And all communications implemented via messages. This way all message parameters should be serializable. Hence, it’s not possible to serialize Task, Func, or Action and it’s the context into grain messages.

The inheritance is the only way to inject workload and make grain reusable.

This is the example of the long-running task implementation:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    public class LongRunningWorkload : BackgroundWorkload<int, string>
    {
        public const int InvalidValue = 100;

        protected override async Task<string> ProcessAsync(int value, CancellationToken cancellationToken)
        {
            if (value == InvalidValue)
            {
                throw new ArgumentException("Unexpected value");
            }

            await Task.Delay(4000, cancellationToken);

            return $"Long running task with value {value} is completed";
        }
    }

Grain client

Grain client is a simple cycle that is waiting for Completed or Failed result from grain.GetResultAsync.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    public class TaskController : ControllerBase
    {
        private readonly IGrainFactory _grainFactory;

        public TaskController(IGrainFactory grainFactory)
        {
            _grainFactory = grainFactory;
        }

        [HttpGet]
        public async Task<IActionResult> Get(int value)
        {
            var grain = _grainFactory.GetGrain<IBackgroundWorkload<int, string>>("Test");

            await grain.StartAsync(value);

            while (true)
            {
                var result = await grain.GetResultAsync();
                switch (result)
                {
                    case Started _:
                        await Task.Delay(100);
                        break;

                    case Completed<string> completed:
                        return Ok($"Completed: {completed.Response}");

                    case Failed failed:
                        return Ok($"Failed: {failed.Exception}");
                }
            }
        }
    }

Results

BackgroundWorkload implements a simple state machine which is stores long-running tasks execution state. And client code is waiting for a task to complete. All Orleans messages are processed rapidly and don’t block Orleans task scheduler which is not designed to handle long-running tasks.

Complete solution with source code and tests can be found at https://github.com/Igor-Pchelko/blog-orleans-long-running-tasks.