Orleans is aktor model framework for building concurrent and scalable distributed applications.
Grains (aka aktors) communicate with each other via messages. And it’s quite important to process a message within certain ResponseTimeout.
If a message is not processed within timeout, Orleans throws timeout exception.
But what if grain needs to perform long-running tasks that can’t be complete within a defined timeframe?
Let’s dive into the solution approach.
Setup
This implementation is based on the following technology stacks:
If you are not familiar with Orleans, you can start from the documentation: https://dotnet.github.io/orleans/.
Long-running tasks in grains
Orleans engine expects that response to the grain message is robust enough. And it’s regulated with ResponseTimeout configuration, which is global on the Silo / Client level.
To overcome this limitation tasks can be offloaded as background tasks into the system task scheduler.
In the following implementation the grain BackgroundWorkload
contains the abstract method ProcessAsync
which return long-running task.
ProcessAsync
should be overloaded in a child’s class.
When ProcessAsync
task completed or failed task thread sends respond message back to the BackgroundWorkload
grain.
The following snippet is a base class for a long-running workload.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
| public abstract class BackgroundWorkload<TRequest, TResponse> : Grain, IBackgroundWorkload<TRequest, TResponse>
{
private IResult _result = new None();
private Task? _task;
public Task<bool> StartAsync(TRequest request)
{
return StartAsync(request, CancellationToken.None);
}
public Task<bool> StartAsync(TRequest request, GrainCancellationToken cancellationToken)
{
return StartAsync(request, cancellationToken.CancellationToken);
}
// Start long-running task
private Task<bool> StartAsync(TRequest request, CancellationToken cancellationToken)
{
if (_task != null)
{
return Task.FromResult(false);
}
_result = new Started();
_task = CreateTask(request, cancellationToken, TaskScheduler.Current);
return Task.FromResult(true);
}
// Run task in the default task scheduler
private Task CreateTask(TRequest request, CancellationToken cancellationToken, TaskScheduler orleansTaskScheduler) =>
Task.Run(async () =>
{
try
{
var response = await ProcessAsync(request, cancellationToken);
// After workflow complete send grain message CompleteAsync
await InvokeGrainAsync(orleansTaskScheduler, grain => grain.CompleteAsync(response));
}
catch (Exception exception)
{
// After workflow failed send grain message FailedAsync
await InvokeGrainAsync(orleansTaskScheduler, grain => grain.FailedAsync(exception));
}
});
// Execute task on the Grain task scheduler
private Task InvokeGrainAsync(TaskScheduler orleansTaskScheduler, Func<IBackgroundWorkload<TRequest, TResponse>, Task> action) =>
Task.Factory.StartNew(async () =>
{
var grain = GrainFactory.GetGrain<IBackgroundWorkload<TRequest, TResponse>>(this.GetPrimaryKeyString());
await action(grain);
}, CancellationToken.None, TaskCreationOptions.None, orleansTaskScheduler);
// Stores long-running task result to the grain state
public Task CompleteAsync(TResponse response)
{
if (!(_result is Started))
{
return Task.CompletedTask;
}
_task = null;
_result = new Completed<TResponse>(response);
return Task.CompletedTask;
}
// Stores long-running task fail result to the grain state
public Task FailedAsync(Exception exception)
{
if (!(_result is Started))
{
return Task.CompletedTask;
}
_task = null;
_result = new Failed(exception);
return Task.CompletedTask;
}
// Returns current grain state
public Task<IResult> GetResultAsync()
{
return Task.FromResult(_result);
}
// Long-running task process
protected abstract Task<TResponse> ProcessAsync(TRequest request, CancellationToken cancellationToken);
}
|
Implementation details
Orleans can instantiate grain instances on different physical machines.
And all communications implemented via messages.
This way all message parameters should be serializable.
Hence, it’s not possible to serialize Task
, Func
, or Action
and it’s the context into grain messages.
The inheritance is the only way to inject workload and make grain reusable.
This is the example of the long-running task implementation:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| public class LongRunningWorkload : BackgroundWorkload<int, string>
{
public const int InvalidValue = 100;
protected override async Task<string> ProcessAsync(int value, CancellationToken cancellationToken)
{
if (value == InvalidValue)
{
throw new ArgumentException("Unexpected value");
}
await Task.Delay(4000, cancellationToken);
return $"Long running task with value {value} is completed";
}
}
|
Grain client
Grain client is a simple cycle that is waiting for Completed
or Failed
result from grain.GetResultAsync
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| public class TaskController : ControllerBase
{
private readonly IGrainFactory _grainFactory;
public TaskController(IGrainFactory grainFactory)
{
_grainFactory = grainFactory;
}
[HttpGet]
public async Task<IActionResult> Get(int value)
{
var grain = _grainFactory.GetGrain<IBackgroundWorkload<int, string>>("Test");
await grain.StartAsync(value);
while (true)
{
var result = await grain.GetResultAsync();
switch (result)
{
case Started _:
await Task.Delay(100);
break;
case Completed<string> completed:
return Ok($"Completed: {completed.Response}");
case Failed failed:
return Ok($"Failed: {failed.Exception}");
}
}
}
}
|
Results
BackgroundWorkload
implements a simple state machine which is stores long-running tasks execution state. And client code is waiting for a task to complete.
All Orleans messages are processed rapidly and don’t block Orleans task scheduler which is not designed to handle long-running tasks.
Complete solution with source code and tests can be found at https://github.com/Igor-Pchelko/blog-orleans-long-running-tasks.