diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile
new file mode 100644
index 0000000..7d0ff28
--- /dev/null
+++ b/.devcontainer/Dockerfile
@@ -0,0 +1,7 @@
+FROM mcr.microsoft.com/dotnet/sdk:10.0-noble
+
+RUN apt-get update \
+ && export DEBIAN_FRONTEND=noninteractive \
+ && apt-get -y install --no-install-recommends git octave \
+ && apt-get clean \
+ && rm -rf /var/lib/apt/lists/*
diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json
new file mode 100644
index 0000000..7b1fe7a
--- /dev/null
+++ b/.devcontainer/devcontainer.json
@@ -0,0 +1,8 @@
+{
+ "name": "Octave.NET",
+ "build": {
+ "context": "..",
+ "dockerfile": "Dockerfile"
+ },
+ "postCreateCommand": "dotnet --info && octave-cli --version && dotnet restore src/Octave.NET.sln"
+}
diff --git a/.travis.yml b/.travis.yml
index 6a7746b..964b8cf 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,15 +1,13 @@
-language: csharp
-dist: trusty
-mono: none
-dotnet: 2.0.0
-before_install:
-- sudo add-apt-repository ppa:octave/stable -y
-- sudo apt-get update -y
-- sudo apt-get install octave -y
+language: minimal
+dist: jammy
-install:
-- cd src
-- dotnet restore
+services:
+ - docker
+
+env:
+ global:
+ - DEVCONTAINER_IMAGE=octave-net-ci
script:
-- dotnet test Octave.NET.Tests
\ No newline at end of file
+ - docker build -f .devcontainer/Dockerfile -t "$DEVCONTAINER_IMAGE" .
+ - docker run --rm -v "$TRAVIS_BUILD_DIR:/workspace" -w /workspace "$DEVCONTAINER_IMAGE" dotnet test src/Octave.NET.Tests/Octave.NET.Tests.csproj --logger "console;verbosity=minimal"
diff --git a/Directory.Build.props b/Directory.Build.props
new file mode 100644
index 0000000..1453c9c
--- /dev/null
+++ b/Directory.Build.props
@@ -0,0 +1,11 @@
+
+
+ <_BuildOperatingSystem Condition="$([MSBuild]::IsOSPlatform('Windows'))">windows
+ <_BuildOperatingSystem Condition="$([MSBuild]::IsOSPlatform('Linux'))">linux
+ <_BuildOperatingSystem Condition="'$(_BuildOperatingSystem)' == ''">unknown
+
+ bin/$(_BuildOperatingSystem)/
+ obj/$(_BuildOperatingSystem)/
+ $(DefaultItemExcludes);bin/**;obj/**
+
+
diff --git a/README.md b/README.md
index 4eb40c5..3a6db98 100644
--- a/README.md
+++ b/README.md
@@ -25,6 +25,18 @@ mathematical problems. This library is an attempt to bridge Octave and .NET worl
or specify path to octave-cli binary in your code
- Check the 'examples' folder
+## Security notes
+
+Octave.NET sends commands directly to an `octave-cli` process. Treat every string passed to `Execute` as executable code. Do not pass untrusted user input directly to Octave unless your application validates it, authorizes it, and accepts the risks of Octave code being able to read files, write files, load packages, or invoke system commands.
+
+For production services, prefer setting `OctaveContext.OctaveSettings.OctaveCliPath` to an absolute path that you control instead of relying on `PATH` lookup. Relying on `PATH` can start an unexpected executable if the process environment is misconfigured or writable by another user.
+
+Worker processes are pooled for performance. Disposing an `OctaveContext` returns the underlying Octave process to the pool, and Octave variables are not cleared automatically. Do not store secrets in Octave variables unless you clear them before disposing the context, for example by executing `clear`. For workloads that must isolate data between users or requests, consider using separate processes or explicitly clearing all state before returning a context.
+
+The asynchronous `Execute(string command, CancellationToken cancellationToken)` overload depends on the supplied cancellation token. If the command never completes and the token is never canceled, the returned task can remain incomplete. Use a `CancellationTokenSource` with an appropriate timeout for untrusted, long-running, or user-controlled commands.
+
+The repository's Dev Container and CI configuration use current package feeds to install .NET and Octave. That is convenient for development, but release pipelines with strict supply-chain requirements should pin image digests and review package updates regularly.
+
## How It's Made?
This library spawns octave processes and controls them via standard streams (stdin, stdout and stderr). To keep optimal performance every time OctaveContext is disposed underlying octave-cli process is returned to the object pool, so we don't waste time on spawning new worker processes.
diff --git a/src/Octave.NET.Tests/ObjectPoolTests.cs b/src/Octave.NET.Tests/ObjectPoolTests.cs
new file mode 100644
index 0000000..d7d24ef
--- /dev/null
+++ b/src/Octave.NET.Tests/ObjectPoolTests.cs
@@ -0,0 +1,226 @@
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Octave.NET.Core.ObjectPooling;
+using System;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Octave.NET.Tests
+{
+ [TestClass]
+ public class ObjectPoolTests
+ {
+ private const int TestTimeoutMS = 5_000;
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public void WhenReleasedItemCanBeReused_ShouldReturnSameItem()
+ {
+ var created = 0;
+
+ using (var pool = new ObjectPool(() => new TestPoolItem(++created), 1_000, false, 1))
+ {
+ var item = pool.GetObject();
+
+ pool.ReleaseObject(item);
+
+ var reusedItem = pool.GetObject();
+
+ Assert.AreSame(item, reusedItem);
+ Assert.AreEqual(1, created);
+
+ pool.ReleaseObject(reusedItem);
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public void WhenReleasedItemCannotBeReused_ShouldDisposeItAndCreateReplacement()
+ {
+ var created = 0;
+
+ using (var pool = new ObjectPool(() => new TestPoolItem(++created), 1_000, false, 1))
+ {
+ var item = pool.GetObject();
+ item.CanBeReused = false;
+
+ pool.ReleaseObject(item);
+
+ Assert.IsTrue(item.IsDisposed);
+
+ var replacement = pool.GetObject();
+
+ Assert.AreNotSame(item, replacement);
+ Assert.AreEqual(2, created);
+
+ pool.ReleaseObject(replacement);
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenPoolAtMaxConcurrency_ShouldWaitUntilLeaseIsReleased()
+ {
+ using (var pool = new ObjectPool(() => new TestPoolItem(), 1_000, false, 1))
+ using (var waiterStarted = new ManualResetEventSlim(false))
+ {
+ var item = pool.GetObject();
+
+ var waitingGet = Task.Run(() =>
+ {
+ waiterStarted.Set();
+ return pool.GetObject();
+ });
+
+ Assert.IsTrue(waiterStarted.Wait(1_000));
+ Assert.IsFalse(waitingGet.Wait(100));
+
+ pool.ReleaseObject(item);
+
+ var acquiredItem = await WaitForTaskAsync(waitingGet).ConfigureAwait(false);
+
+ Assert.AreSame(item, acquiredItem);
+
+ pool.ReleaseObject(acquiredItem);
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenFactoryThrows_ShouldReleaseLease()
+ {
+ var attempts = 0;
+
+ using (var pool = new ObjectPool(() =>
+ {
+ var attempt = Interlocked.Increment(ref attempts);
+
+ if (attempt == 1)
+ throw new InvalidOperationException("Factory failed.");
+
+ return new TestPoolItem(attempt);
+ }, 1_000, false, 1))
+ {
+ Assert.ThrowsExactly(() => pool.GetObject());
+
+ var getTask = Task.Run(() => pool.GetObject());
+ var item = await WaitForTaskAsync(getTask).ConfigureAwait(false);
+
+ Assert.AreEqual(2, item.Id);
+
+ pool.ReleaseObject(item);
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public void WhenDisposed_ShouldDisposePooledItemsAndRejectNewLeases()
+ {
+ var pool = new ObjectPool(() => new TestPoolItem(), 1_000, false, 1);
+ var item = pool.GetObject();
+
+ pool.ReleaseObject(item);
+ pool.Dispose();
+
+ Assert.IsTrue(item.IsDisposed);
+ Assert.ThrowsExactly(() => pool.GetObject());
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public void WhenDisposed_ShouldDisposeCheckedOutItemWhenReturned()
+ {
+ var pool = new ObjectPool(() => new TestPoolItem(), 1_000, false, 1);
+ var item = pool.GetObject();
+
+ pool.Dispose();
+ pool.ReleaseObject(item);
+
+ Assert.IsTrue(item.IsDisposed);
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenIdleReclaimRuns_ShouldDisposeIdlePooledItem()
+ {
+ var created = 0;
+
+ using (var pool = new ObjectPool(() => new TestPoolItem(++created), 10, true, 1, 50))
+ {
+ var item = pool.GetObject();
+
+ pool.ReleaseObject(item);
+
+ Assert.IsTrue(await WaitUntilAsync(() => item.IsDisposed).ConfigureAwait(false));
+
+ var replacementTask = Task.Run(() => pool.GetObject());
+ var replacement = await WaitForTaskAsync(replacementTask).ConfigureAwait(false);
+
+ Assert.AreNotSame(item, replacement);
+ Assert.AreEqual(2, created);
+
+ pool.ReleaseObject(replacement);
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public void WhenDisposed_ShouldCancelReclaimTaskWithoutWaitingForInterval()
+ {
+ var pool = new ObjectPool(() => new TestPoolItem(), 60_000, true, 1, 60_000);
+ var stopwatch = Stopwatch.StartNew();
+
+ pool.Dispose();
+
+ stopwatch.Stop();
+ Assert.IsTrue(stopwatch.ElapsedMilliseconds < 1_000);
+ }
+
+ private static async Task WaitForTaskAsync(Task task)
+ {
+ var completedTask = await Task.WhenAny(task, Task.Delay(1_000)).ConfigureAwait(false);
+
+ Assert.AreSame(task, completedTask);
+
+ return await task.ConfigureAwait(false);
+ }
+
+ private static async Task WaitUntilAsync(Func predicate)
+ {
+ var stopwatch = Stopwatch.StartNew();
+
+ while (stopwatch.ElapsedMilliseconds < 1_000)
+ {
+ if (predicate())
+ return true;
+
+ await Task.Delay(10).ConfigureAwait(false);
+ }
+
+ return predicate();
+ }
+
+ private sealed class TestPoolItem : IPoolable, IDisposable
+ {
+ public TestPoolItem()
+ {
+ }
+
+ public TestPoolItem(int id)
+ {
+ Id = id;
+ }
+
+ public int Id { get; }
+
+ public bool CanBeReused { get; set; } = true;
+
+ public bool IsDisposed { get; private set; }
+
+ public void Dispose()
+ {
+ IsDisposed = true;
+ }
+ }
+ }
+}
diff --git a/src/Octave.NET.Tests/Octave.NET.Tests.csproj b/src/Octave.NET.Tests/Octave.NET.Tests.csproj
index da2a589..a4e4431 100644
--- a/src/Octave.NET.Tests/Octave.NET.Tests.csproj
+++ b/src/Octave.NET.Tests/Octave.NET.Tests.csproj
@@ -1,15 +1,15 @@
- netcoreapp2.0
+ net10.0
false
-
-
-
+
+
+
diff --git a/src/Octave.NET.Tests/OctaveContextTests.cs b/src/Octave.NET.Tests/OctaveContextTests.cs
index 8fdc0ad..afc7163 100644
--- a/src/Octave.NET.Tests/OctaveContextTests.cs
+++ b/src/Octave.NET.Tests/OctaveContextTests.cs
@@ -3,6 +3,9 @@
using System;
using System.Collections.Generic;
using System.Globalization;
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Threading;
using System.Threading.Tasks;
namespace Octave.NET.Tests
@@ -10,14 +13,20 @@ namespace Octave.NET.Tests
[TestClass]
public class OctaveContextTests
{
- private const int Timeout = 2500;
+ private const int TimeoutMS = 2_500;
+ private const int TestTimeoutMS = 10_000;
- public OctaveContextTests()
+ [ClassInitialize]
+ public static void Initialize(TestContext testContext)
{
- OctaveContext.OctaveSettings.PreventColdStarts = true;
+ if (!OctaveTestEnvironment.TryConfigureOctave(out var message))
+ Assert.Inconclusive(message);
+
+ testContext.WriteLine(message);
}
[TestMethod]
+ [Timeout(TestTimeoutMS)]
public void WhenCorrectScript_ShouldReturnString()
{
//arrange
@@ -33,6 +42,7 @@ public void WhenCorrectScript_ShouldReturnString()
}
[TestMethod]
+ [Timeout(TestTimeoutMS)]
public void WhenPassedMaxDoubleValue_ShouldReturnMaxValue()
{
const double input = double.MaxValue;
@@ -41,7 +51,7 @@ public void WhenPassedMaxDoubleValue_ShouldReturnMaxValue()
using (var octave = new OctaveContext())
{
//act
- var result = octave.Execute(input.ToString(CultureInfo.InvariantCulture), Timeout).AsScalar();
+ var result = octave.Execute(input.ToString(CultureInfo.InvariantCulture), TimeoutMS).AsScalar();
//assert
Assert.AreEqual(input, result);
@@ -49,6 +59,7 @@ public void WhenPassedMaxDoubleValue_ShouldReturnMaxValue()
}
[TestMethod]
+ [Timeout(TestTimeoutMS)]
public void WhenPassedMinDoubleValue_ShouldReturnMinValue()
{
const double input = double.MinValue;
@@ -57,7 +68,7 @@ public void WhenPassedMinDoubleValue_ShouldReturnMinValue()
using (var octave = new OctaveContext())
{
//act
- var result = octave.Execute(input.ToString(CultureInfo.InvariantCulture), Timeout).AsScalar();
+ var result = octave.Execute(input.ToString(CultureInfo.InvariantCulture), TimeoutMS).AsScalar();
//assert
Assert.AreEqual(input, result);
@@ -65,6 +76,7 @@ public void WhenPassedMinDoubleValue_ShouldReturnMinValue()
}
[TestMethod]
+ [Timeout(TestTimeoutMS)]
public void WhenPassedDoubleInRange_ShouldReturnCorrectValue()
{
const double input = 15;
@@ -73,7 +85,7 @@ public void WhenPassedDoubleInRange_ShouldReturnCorrectValue()
using (var octave = new OctaveContext())
{
//act
- var result = octave.Execute(input.ToString(CultureInfo.InvariantCulture), Timeout).AsScalar();
+ var result = octave.Execute(input.ToString(CultureInfo.InvariantCulture), TimeoutMS).AsScalar();
//assert
Assert.AreEqual(input, result);
@@ -81,13 +93,14 @@ public void WhenPassedDoubleInRange_ShouldReturnCorrectValue()
}
[TestMethod]
+ [Timeout(TestTimeoutMS)]
public void WhenPassedVectorString_ShouldReturnCorrectVector()
{
//arrange
using (var octave = new OctaveContext())
{
//act
- var result = octave.Execute("[0 1 2 3 4 ]", Timeout).AsVector();
+ var result = octave.Execute("[0 1 2 3 4 ]", TimeoutMS).AsVector();
//assert
CollectionAssert.AreEqual(new double[] { 0, 1, 2, 3, 4 }, result);
@@ -95,13 +108,14 @@ public void WhenPassedVectorString_ShouldReturnCorrectVector()
}
[TestMethod]
+ [Timeout(TestTimeoutMS)]
public void WhenPassedMatrixString_ShouldReturnCorrectMatrix()
{
//arrange
using (var octave = new OctaveContext())
{
//act
- var result = octave.Execute("[0 1 2 3 4 ; 4 3 2 1 0]", Timeout).AsMatrix();
+ var result = octave.Execute("[0 1 2 3 4 ; 4 3 2 1 0]", TimeoutMS).AsMatrix();
//assert
CollectionAssert.AreEqual(new double[] { 0, 1, 2, 3, 4 }, result[0]);
@@ -110,40 +124,447 @@ public void WhenPassedMatrixString_ShouldReturnCorrectMatrix()
}
[TestMethod]
- [ExpectedException(typeof(OctaveScriptError))]
+ [Timeout(TestTimeoutMS)]
public void WhenPassedInvalidScript_ShouldThrowException()
{
//arrange
using (var octave = new OctaveContext())
{
//act
- octave.Execute("'123", Timeout);
+ Assert.ThrowsExactly(() => octave.Execute("'123", TimeoutMS));
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public void WhenSyncScriptWritesToErrorAndContinues_ShouldRetireWorker()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ octave.Execute("1", TimeoutMS);
+ var workerProcess = GetWorkerProcess(octave);
+
+ //act
+ Assert.ThrowsExactly(() =>
+ octave.Execute("fdisp(stderr, \"bad\"); pause(5)", TimeoutMS));
+
+ var result = octave.Execute("2+2", TimeoutMS).AsScalar();
+
+ //assert
+ Assert.IsTrue(IsWorkerDisposed(workerProcess));
+ Assert.AreEqual(4, result);
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public void WhenSyncScriptWritesToErrorAndCompletesImmediately_ShouldThrowException()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ //act
+ Assert.ThrowsExactly(() =>
+ octave.Execute("fdisp(stderr, \"bad\")", TimeoutMS));
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public void WhenSyncScriptWritesToErrorWithoutNewLine_ShouldThrowException()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ //act
+ var exception = Assert.ThrowsExactly(() =>
+ octave.Execute("fprintf(stderr, \"bad\")", TimeoutMS));
+
+ //assert
+ StringAssert.Contains(exception.Message, "bad");
}
}
[TestMethod]
- [ExpectedException(typeof(OctaveCommandTimeoutException))]
+ [Timeout(TestTimeoutMS)]
public void WhenScriptExecutionTakesTooLong_ShouldThrowException()
{
//arrange
using (var octave = new OctaveContext())
{
//act
- octave.Execute("pause(100)", 25);
+ Assert.ThrowsExactly(() => octave.Execute("pause(100)", 25));
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public void WhenSyncTimeoutIsInvalid_ShouldThrowException()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ //act
+ Assert.ThrowsExactly(() => octave.Execute("1", -2));
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenCorrectScriptExecutedAsync_ShouldReturnString()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ //act
+ var result = await octave.Execute("123", CancellationToken.None);
+
+ //assert
+ Assert.IsTrue(result.StartsWith("ans"));
+ Assert.IsTrue(result.EndsWith("123"));
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenAsyncScriptWritesToError_ShouldThrowException()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ //act
+ await Assert.ThrowsExactlyAsync(() =>
+ octave.Execute("fdisp(stderr, \"bad\"); pause(.1)", CancellationToken.None));
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenAsyncScriptWritesToErrorAndCompletesImmediately_ShouldThrowException()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ //act
+ await Assert.ThrowsExactlyAsync(() =>
+ octave.Execute("fdisp(stderr, \"bad\")", CancellationToken.None));
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenAsyncScriptWritesToErrorWithoutNewLine_ShouldThrowException()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ //act
+ var exception = await Assert.ThrowsExactlyAsync(() =>
+ octave.Execute("fprintf(stderr, \"bad\")", CancellationToken.None));
+
+ //assert
+ StringAssert.Contains(exception.Message, "bad");
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenAsyncScriptWritesMultipleErrorLines_ShouldIncludeCollectedError()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ //act
+ var exception = await Assert.ThrowsExactlyAsync(() =>
+ octave.Execute("fprintf(stderr, \"first\\nsecond\\n\"); pause(.1)", CancellationToken.None));
+
+ //assert
+ StringAssert.Contains(exception.Message, "first");
+ StringAssert.Contains(exception.Message, "second");
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenAsyncScriptExecutionIsCanceled_ShouldThrowException()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ using (var cancellationTokenSource = new CancellationTokenSource())
+ {
+ cancellationTokenSource.CancelAfter(25);
+
+ //act
+ await Assert.ThrowsAsync(() =>
+ octave.Execute("pause(100)", cancellationTokenSource.Token));
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenAsyncScriptExecutionIsCanceled_ShouldPreserveCancellationToken()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ using (var cancellationTokenSource = new CancellationTokenSource())
+ {
+ cancellationTokenSource.CancelAfter(25);
+
+ //act
+ var exception = await Assert.ThrowsAsync(() =>
+ octave.Execute("pause(100)", cancellationTokenSource.Token));
+
+ //assert
+ Assert.AreEqual(cancellationTokenSource.Token, exception.CancellationToken);
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenAsyncOctaveProcessExits_ShouldThrowException()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ var executionTask = octave.Execute("pause(100); 42", CancellationToken.None);
+
+ await Task.Delay(50);
+
+ var workerProcess = GetWorkerProcess(octave);
+
+ //act
+ workerProcess.Kill();
+
+ //assert
+ await Assert.ThrowsExactlyAsync(() => executionTask);
}
}
[TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public void WhenSyncOctaveProcessExits_ShouldThrowExceptionAndRecover()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ //act
+ Assert.ThrowsExactly(() => octave.Execute("exit", TimeoutMS));
+
+ var result = octave.Execute("2+2", TimeoutMS).AsScalar();
+
+ //assert
+ Assert.AreEqual(4, result);
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenAsyncCommandIsEmptyAndCanceled_ShouldThrowException()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ using (var cancellationTokenSource = new CancellationTokenSource())
+ {
+ cancellationTokenSource.Cancel();
+
+ //act
+ await Assert.ThrowsExactlyAsync(() =>
+ octave.Execute("", cancellationTokenSource.Token));
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenSameContextExecutesConcurrently_ShouldSerializeCommands()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ var firstTask = Task.Run(() => octave.Execute("pause(.25); 11", TimeoutMS));
+ var secondTask = Task.Run(() => octave.Execute("22", TimeoutMS));
+
+ //act
+ var results = await Task.WhenAll(firstTask, secondTask);
+
+ //assert
+ Assert.IsTrue(results[0].EndsWith("11"));
+ Assert.IsTrue(results[1].EndsWith("22"));
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenSameContextExecutesSyncAndAsyncConcurrently_ShouldSerializeCommands()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ var asyncTask = octave.Execute("pause(.25); 33", CancellationToken.None);
+ var syncTask = Task.Run(() => octave.Execute("44", TimeoutMS));
+
+ //act
+ var results = await Task.WhenAll(asyncTask, syncTask);
+
+ //assert
+ Assert.IsTrue(results[0].EndsWith("33"));
+ Assert.IsTrue(results[1].EndsWith("44"));
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenDisposed_ShouldRejectExecute()
+ {
+ //arrange
+ var octave = new OctaveContext();
+
+ //act
+ octave.Dispose();
+
+ //assert
+ Assert.ThrowsExactly(() => octave.Execute("1", TimeoutMS));
+ await Assert.ThrowsExactlyAsync(() => octave.Execute("1", CancellationToken.None));
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public void WhenContextIsAbandoned_ShouldReleasePoolLeaseDuringFinalization()
+ {
+ //arrange
+ using (var warmup = new OctaveContext())
+ {
+ warmup.Execute("1", TimeoutMS);
+ }
+
+ var availableLeasesBefore = GetAvailablePoolLeaseCount();
+
+ CreateAbandonedContext();
+
+ //act
+ GC.Collect();
+ GC.WaitForPendingFinalizers();
+ GC.Collect();
+
+ //assert
+ Assert.AreEqual(availableLeasesBefore, GetAvailablePoolLeaseCount());
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenDisposedDuringAsyncExecution_ShouldInterruptActiveCommand()
+ {
+ //arrange
+ var octave = new OctaveContext();
+ var executionTask = octave.Execute("pause(100); 42", CancellationToken.None);
+
+ await Task.Delay(50);
+
+ //act
+ var disposeTask = Task.Run(() => octave.Dispose());
+
+ //assert
+ await disposeTask;
+
+ await Assert.ThrowsExactlyAsync(() => executionTask);
+ await Assert.ThrowsExactlyAsync(() => octave.Execute("1", CancellationToken.None));
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenDisposedWithQueuedExecution_ShouldRejectQueuedExecution()
+ {
+ //arrange
+ var octave = new OctaveContext();
+ var activeTask = octave.Execute("pause(.5); 5", CancellationToken.None);
+ var queuedTask = Task.Run(() => octave.Execute("6", TimeoutMS));
+
+ await Task.Delay(50);
+
+ //act
+ var disposeTask = Task.Run(() => octave.Dispose());
+
+ //assert
+ await disposeTask;
+ await Assert.ThrowsExactlyAsync(() => activeTask);
+ await Assert.ThrowsExactlyAsync(() => queuedTask);
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public void WhenDisposedAfterSuccessfulExecution_ShouldLeaveWorkerReusable()
+ {
+ //arrange
+ var octave = new OctaveContext();
+
+ octave.Execute("1", TimeoutMS);
+
+ var workerProcess = GetWorkerProcess(octave);
+
+ //act
+ octave.Dispose();
+
+ //assert
+ Assert.IsTrue(workerProcess.CanBeReused);
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public void WhenWorkerWriteFailsWithNonIOException_ShouldRetireWorkerAndRecover()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ octave.Execute("1", TimeoutMS);
+
+ GetWorkerProcess(octave).Dispose();
+
+ //act
+ Assert.ThrowsExactly(() => octave.Execute("2", TimeoutMS));
+
+ var result = octave.Execute("3", TimeoutMS).AsScalar();
+
+ //assert
+ Assert.AreEqual(3, result);
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
+ public async Task WhenAsyncWorkerWriteFailsWithNonIOException_ShouldClearActiveStopAndRecover()
+ {
+ //arrange
+ using (var octave = new OctaveContext())
+ {
+ octave.Execute("1", TimeoutMS);
+
+ GetWorkerProcess(octave).Dispose();
+
+ //act
+ await Assert.ThrowsExactlyAsync(() => octave.Execute("2", CancellationToken.None));
+
+ var result = (await octave.Execute("3", CancellationToken.None)).AsScalar();
+
+ //assert
+ Assert.IsNull(GetActiveExecutionStop(octave));
+ Assert.AreEqual(3, result);
+ }
+ }
+
+ [TestMethod]
+ [Timeout(TestTimeoutMS)]
public void WhenHeavilyMultithreaded_ThrowsNoExceptions()
{
var tasks = new List();
+ var workerCount = Math.Min(Environment.ProcessorCount, 4);
+ const int iterationsPerWorker = 10;
//arrange
- for (var i = 0; i < Environment.ProcessorCount * 2; i++)
+ for (var i = 0; i < workerCount; i++)
{
var task = Task.Run(() =>
{
- for (var j = 0; j < 100; j++)
+ for (var j = 0; j < iterationsPerWorker; j++)
{
using (var octave = new OctaveContext())
{
@@ -161,5 +582,50 @@ public void WhenHeavilyMultithreaded_ThrowsNoExceptions()
Task.WaitAll(tasks.ToArray());
}
+
+ private static OctaveProcess GetWorkerProcess(OctaveContext octave)
+ {
+ var workerProcessField = typeof(OctaveContext).GetField("workerProcess",
+ BindingFlags.Instance | BindingFlags.NonPublic);
+
+ return (OctaveProcess)workerProcessField.GetValue(octave);
+ }
+
+ private static bool IsWorkerDisposed(OctaveProcess workerProcess)
+ {
+ var isDisposedField = typeof(OctaveProcess).GetField("isDisposed",
+ BindingFlags.Instance | BindingFlags.NonPublic);
+
+ return (bool)isDisposedField.GetValue(workerProcess);
+ }
+
+ private static Action GetActiveExecutionStop(OctaveContext octave)
+ {
+ var activeExecutionStopField = typeof(OctaveContext).GetField("activeExecutionStop",
+ BindingFlags.Instance | BindingFlags.NonPublic);
+
+ return (Action)activeExecutionStopField.GetValue(octave);
+ }
+
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ private static void CreateAbandonedContext()
+ {
+ var octave = new OctaveContext();
+
+ octave.Execute("1", TimeoutMS);
+ }
+
+ private static int GetAvailablePoolLeaseCount()
+ {
+ var processPoolField = typeof(OctaveContext).GetField("processPool",
+ BindingFlags.Static | BindingFlags.NonPublic);
+ var processPool = processPoolField.GetValue(null);
+
+ var concurrencySemaphoreField = processPool.GetType().GetField("concurrencySemaphore",
+ BindingFlags.Instance | BindingFlags.NonPublic);
+ var concurrencySemaphore = (SemaphoreSlim)concurrencySemaphoreField.GetValue(processPool);
+
+ return concurrencySemaphore.CurrentCount;
+ }
}
-}
\ No newline at end of file
+}
diff --git a/src/Octave.NET.Tests/OctaveDoubleExtensionsTests.cs b/src/Octave.NET.Tests/OctaveDoubleExtensionsTests.cs
index bf40d9f..85448e8 100644
--- a/src/Octave.NET.Tests/OctaveDoubleExtensionsTests.cs
+++ b/src/Octave.NET.Tests/OctaveDoubleExtensionsTests.cs
@@ -15,7 +15,7 @@ public void VectorToOctave_ReturnsCorrectResult()
var res = input.ToOctave();
//assert
- Assert.AreEqual(res, "[1 2 3]");
+ Assert.AreEqual("[1 2 3]", res);
}
[TestMethod]
@@ -28,7 +28,7 @@ public void EmptyVectorToOctave_ReturnsCorrectResult()
var res = input.ToOctave();
//assert
- Assert.AreEqual(res, "[]");
+ Assert.AreEqual("[]", res);
}
[TestMethod]
@@ -45,7 +45,7 @@ public void MatrixToOctave_ReturnsCorrectResult()
var res = input.ToOctave();
//assert
- Assert.AreEqual(res, "[1 2 3;3 2 1]");
+ Assert.AreEqual("[1 2 3;3 2 1]", res);
}
[TestMethod]
@@ -58,7 +58,7 @@ public void EmptyMatrixToOctave_ReturnsCorrectResult()
var res = input.ToOctave();
//assert
- Assert.AreEqual(res, "[]");
+ Assert.AreEqual("[]", res);
}
}
}
diff --git a/src/Octave.NET.Tests/OctaveStringExtensionsTests.cs b/src/Octave.NET.Tests/OctaveStringExtensionsTests.cs
index 430fbfb..c06a8da 100644
--- a/src/Octave.NET.Tests/OctaveStringExtensionsTests.cs
+++ b/src/Octave.NET.Tests/OctaveStringExtensionsTests.cs
@@ -16,7 +16,7 @@ public void StringAsScalar_ReturnsCorrectResult()
var res = input.AsScalar();
//assert
- Assert.AreEqual(res, 34);
+ Assert.AreEqual(34, res);
}
[TestMethod]
@@ -29,7 +29,7 @@ public void StringInfinityAsScalar_ReturnsCorrectResult()
var res = input.AsScalar();
//assert
- Assert.AreEqual(res, double.MaxValue);
+ Assert.AreEqual(double.MaxValue, res);
}
[TestMethod]
@@ -42,7 +42,33 @@ public void StringMinusInfinityAsScalar_ReturnsCorrectResult()
var res = input.AsScalar();
//assert
- Assert.AreEqual(res, double.MinValue);
+ Assert.AreEqual(double.MinValue, res);
+ }
+
+ [TestMethod]
+ public void StringOverflowAsScalar_ReturnsMaxValue()
+ {
+ //arrange
+ var input = "ans = 1.7977e+308";
+
+ //act
+ var res = input.AsScalar();
+
+ //assert
+ Assert.AreEqual(double.MaxValue, res);
+ }
+
+ [TestMethod]
+ public void StringNegativeOverflowAsScalar_ReturnsMinValue()
+ {
+ //arrange
+ var input = "ans = -1.7977e+308";
+
+ //act
+ var res = input.AsScalar();
+
+ //assert
+ Assert.AreEqual(double.MinValue, res);
}
[TestMethod]
@@ -55,10 +81,10 @@ public void StringAsVector_ReturnsCorrectResult()
var res = input.AsVector();
//assert
- Assert.AreEqual(res.Length, 3);
- Assert.AreEqual(res[0], 1);
- Assert.AreEqual(res[1], 2);
- Assert.AreEqual(res[2], 3);
+ Assert.AreEqual(3, res.Length);
+ Assert.AreEqual(1, res[0]);
+ Assert.AreEqual(2, res[1]);
+ Assert.AreEqual(3, res[2]);
}
[TestMethod]
@@ -71,10 +97,10 @@ public void StringWithInfinitiesAsVector_ReturnsCorrectResult()
var res = input.AsVector();
//assert
- Assert.AreEqual(res.Length, 3);
- Assert.AreEqual(res[0], 1);
- Assert.AreEqual(res[1], double.MinValue);
- Assert.AreEqual(res[2], double.MaxValue);
+ Assert.AreEqual(3, res.Length);
+ Assert.AreEqual(1, res[0]);
+ Assert.AreEqual(double.MinValue, res[1]);
+ Assert.AreEqual(double.MaxValue, res[2]);
}
[TestMethod]
@@ -87,15 +113,15 @@ public void StringAsMatrix_ReturnsCorrectResult()
var res = input.AsMatrix();
//assert
- Assert.AreEqual(res.Length, 2);
- Assert.AreEqual(res[0].Length, 3);
- Assert.AreEqual(res[0][0], 4);
- Assert.AreEqual(res[0][1], 5);
- Assert.AreEqual(res[0][2], 6);
- Assert.AreEqual(res[1].Length, 3);
- Assert.AreEqual(res[1][0], 7);
- Assert.AreEqual(res[1][1], 8);
- Assert.AreEqual(res[1][2], 9);
+ Assert.AreEqual(2, res.Length);
+ Assert.AreEqual(3, res[0].Length);
+ Assert.AreEqual(4, res[0][0]);
+ Assert.AreEqual(5, res[0][1]);
+ Assert.AreEqual(6, res[0][2]);
+ Assert.AreEqual(3, res[1].Length);
+ Assert.AreEqual(7, res[1][0]);
+ Assert.AreEqual(8, res[1][1]);
+ Assert.AreEqual(9, res[1][2]);
}
}
}
diff --git a/src/Octave.NET.Tests/OctaveTestEnvironment.cs b/src/Octave.NET.Tests/OctaveTestEnvironment.cs
new file mode 100644
index 0000000..06f0ae9
--- /dev/null
+++ b/src/Octave.NET.Tests/OctaveTestEnvironment.cs
@@ -0,0 +1,107 @@
+using System;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+
+namespace Octave.NET.Tests
+{
+ internal static class OctaveTestEnvironment
+ {
+ private const string OctaveCliEnvironmentVariable = "OCTAVE_CLI_PATH";
+
+ public static bool TryConfigureOctave(out string message)
+ {
+ var octaveCliPath = FindOctaveCliPath();
+
+ if (octaveCliPath == null)
+ {
+ message = $"Octave CLI was not found. Add octave-cli to PATH or set {OctaveCliEnvironmentVariable} to the full octave-cli executable path.";
+ return false;
+ }
+
+ OctaveContext.OctaveSettings = new OctaveSettings
+ {
+ OctaveCliPath = octaveCliPath,
+ PreventColdStarts = true
+ };
+
+ message = $"Using Octave CLI at '{octaveCliPath}'.";
+ return true;
+ }
+
+ private static string FindOctaveCliPath()
+ {
+ var configuredPath = Environment.GetEnvironmentVariable(OctaveCliEnvironmentVariable);
+
+ if (File.Exists(configuredPath))
+ return configuredPath;
+
+ var pathOctaveCli = FindOnPath(GetExecutableFileName());
+
+ if (pathOctaveCli != null)
+ return pathOctaveCli;
+
+ return IsWindows()
+ ? FindWindowsOctaveCli()
+ : null;
+ }
+
+ private static string FindOnPath(string executableFileName)
+ {
+ var path = Environment.GetEnvironmentVariable("PATH");
+
+ if (string.IsNullOrEmpty(path))
+ return null;
+
+ return path.Split(Path.PathSeparator)
+ .Where(directory => !string.IsNullOrWhiteSpace(directory))
+ .Select(directory => Path.Combine(directory.Trim(), executableFileName))
+ .FirstOrDefault(File.Exists);
+ }
+
+ private static string FindWindowsOctaveCli()
+ {
+ var roots = new[]
+ {
+ Environment.GetFolderPath(Environment.SpecialFolder.ProgramFiles),
+ Environment.GetFolderPath(Environment.SpecialFolder.ProgramFilesX86)
+ };
+
+ return roots
+ .Where(root => !string.IsNullOrWhiteSpace(root))
+ .SelectMany(root => new[]
+ {
+ Path.Combine(root, "GNU Octave"),
+ Path.Combine(root, "Octave")
+ })
+ .Where(Directory.Exists)
+ .SelectMany(root => Directory.EnumerateFiles(root, "octave-cli.exe", SearchOption.AllDirectories))
+ .OrderByDescending(GetFileVersion)
+ .FirstOrDefault();
+ }
+
+ private static Version GetFileVersion(string path)
+ {
+ var version = FileVersionInfo.GetVersionInfo(path).FileVersion;
+ return Version.TryParse(version, out var parsedVersion)
+ ? parsedVersion
+ : new Version();
+ }
+
+ private static string GetExecutableFileName()
+ {
+ return IsWindows()
+ ? "octave-cli.exe"
+ : "octave-cli";
+ }
+
+ private static bool IsWindows()
+ {
+ var platform = Environment.OSVersion.Platform;
+ return platform == PlatformID.Win32NT ||
+ platform == PlatformID.Win32S ||
+ platform == PlatformID.Win32Windows ||
+ platform == PlatformID.WinCE;
+ }
+ }
+}
diff --git a/src/Octave.NET/Core/ObjectPooling/ObjectPool.cs b/src/Octave.NET/Core/ObjectPooling/ObjectPool.cs
index a72b120..4959b69 100644
--- a/src/Octave.NET/Core/ObjectPooling/ObjectPool.cs
+++ b/src/Octave.NET/Core/ObjectPooling/ObjectPool.cs
@@ -6,7 +6,7 @@
namespace Octave.NET.Core.ObjectPooling
{
- internal interface IObjectPool where T : IPoolable
+ internal interface IObjectPool : IDisposable where T : IPoolable
{
T GetObject();
void ReleaseObject(T obj);
@@ -15,26 +15,32 @@ internal interface IObjectPool where T : IPoolable
internal class ObjectPool : IObjectPool where T : IPoolable
{
private readonly Func createObjectFunc;
+ private readonly object syncRoot = new object();
private readonly ConcurrentBag internalPool = new ConcurrentBag();
private readonly int MaxConcurrency = Environment.ProcessorCount;
private readonly int PoolIdleThresholdMs = 2500;
private readonly int ReclaimTaskSleepTimeMs;
private readonly Stopwatch watch;
-
- private int aliveObjects;
+ private readonly SemaphoreSlim concurrencySemaphore;
+ private int disposed;
private CancellationTokenSource reclaimTaskCancellationTokenSource;
+ private Task reclaimTask;
public ObjectPool(Func createObjectFunc, int reclaimTaskSleepTime, bool reclaimWhenIdle = true,
- int? maxConcurrency = null)
+ int? maxConcurrency = null, int? poolIdleThresholdMs = null)
{
ReclaimTaskSleepTimeMs = reclaimTaskSleepTime;
if (maxConcurrency != null && maxConcurrency > 0)
MaxConcurrency = maxConcurrency.Value;
+ if (poolIdleThresholdMs != null && poolIdleThresholdMs > 0)
+ PoolIdleThresholdMs = poolIdleThresholdMs.Value;
+
this.createObjectFunc = createObjectFunc ?? throw new ArgumentNullException(nameof(createObjectFunc));
+ concurrencySemaphore = new SemaphoreSlim(MaxConcurrency, MaxConcurrency);
if (!reclaimWhenIdle) return;
@@ -44,52 +50,77 @@ public ObjectPool(Func createObjectFunc, int reclaimTaskSleepTime, bool recla
StartReclaimTask();
}
-
public T GetObject()
{
- lock (createObjectFunc)
- {
- watch?.Restart();
+ ThrowIfDisposed();
+ // The semaphore represents checked-out pool leases. Waiting here
+ // replaces the old Thread.Sleep polling when the pool is at capacity.
+ concurrencySemaphore.Wait();
- var limitReached = false;
+ try
+ {
+ ThrowIfDisposed();
- do
+ lock (syncRoot)
{
- if (internalPool.TryTake(out var item))
+ ThrowIfDisposed();
+ watch?.Restart();
+
+ while (internalPool.TryTake(out var item))
{
if (item.CanBeReused) return item;
(item as IDisposable)?.Dispose();
- aliveObjects--;
}
- limitReached = aliveObjects >= MaxConcurrency;
-
- if (limitReached)
- Thread.Sleep(1);
- } while (limitReached);
-
- aliveObjects++;
- return createObjectFunc();
+ return createObjectFunc();
+ }
+ }
+ catch
+ {
+ ReleaseLease();
+ throw;
}
}
public void ReleaseObject(T obj)
{
- internalPool.Add(obj);
+ try
+ {
+ var shouldDispose = Volatile.Read(ref disposed) == 1 || !obj.CanBeReused;
+
+ if (!shouldDispose)
+ {
+ lock (syncRoot)
+ {
+ shouldDispose = Volatile.Read(ref disposed) == 1 || !obj.CanBeReused;
+
+ if (!shouldDispose)
+ {
+ internalPool.Add(obj);
+ return;
+ }
+ }
+ }
+
+ (obj as IDisposable)?.Dispose();
+ }
+ finally
+ {
+ ReleaseLease();
+ }
}
~ObjectPool()
{
- foreach (var item in internalPool)
- (item as IDisposable)?.Dispose();
-
- if (reclaimTaskCancellationTokenSource == null) return;
-
- reclaimTaskCancellationTokenSource.Cancel();
- reclaimTaskCancellationTokenSource.Dispose();
+ Dispose(false);
}
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
private void StartReclaimTask()
{
@@ -97,26 +128,92 @@ private void StartReclaimTask()
var cancellationToken = reclaimTaskCancellationTokenSource.Token;
- Task.Run(() =>
+ reclaimTask = Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
- Thread.Sleep(ReclaimTaskSleepTimeMs);
-
- if (internalPool.IsEmpty || watch.ElapsedMilliseconds <= PoolIdleThresholdMs) continue;
+ try
+ {
+ // Use a cancel-aware delay so shutdown does not have to wait
+ // for the full reclaim interval to elapse.
+ await Task.Delay(ReclaimTaskSleepTimeMs, cancellationToken).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ return;
+ }
- var success = internalPool.TryTake(out var item);
+ T item;
+ bool success;
- if (success)
+ lock (syncRoot)
{
- (item as IDisposable)?.Dispose();
- aliveObjects--;
- }
+ if (internalPool.IsEmpty || watch.ElapsedMilliseconds <= PoolIdleThresholdMs)
+ continue;
+ success = internalPool.TryTake(out item);
+ watch?.Restart();
+ }
- watch?.Restart();
+ if (success)
+ (item as IDisposable)?.Dispose();
}
}, cancellationToken);
}
+
+ private void Dispose(bool disposing)
+ {
+ if (Interlocked.Exchange(ref disposed, 1) == 1)
+ return;
+
+ reclaimTaskCancellationTokenSource?.Cancel();
+
+ if (disposing)
+ {
+ // Explicit disposal can safely wait for the background reclaim
+ // loop. The finalizer intentionally skips this wait so it never
+ // blocks the runtime finalizer thread.
+ WaitForReclaimTask();
+ reclaimTaskCancellationTokenSource?.Dispose();
+ }
+
+ lock (syncRoot)
+ {
+ while (internalPool.TryTake(out var item))
+ (item as IDisposable)?.Dispose();
+ }
+
+ if (disposing)
+ concurrencySemaphore.Dispose();
+ }
+
+ private void WaitForReclaimTask()
+ {
+ try
+ {
+ reclaimTask?.Wait();
+ }
+ catch (AggregateException exception)
+ {
+ exception.Handle(innerException => innerException is OperationCanceledException);
+ }
+ }
+
+ private void ThrowIfDisposed()
+ {
+ if (Volatile.Read(ref disposed) == 1)
+ throw new ObjectDisposedException(GetType().FullName);
+ }
+
+ private void ReleaseLease()
+ {
+ try
+ {
+ concurrencySemaphore.Release();
+ }
+ catch (ObjectDisposedException)
+ {
+ }
+ }
}
-}
\ No newline at end of file
+}
diff --git a/src/Octave.NET/Octave.NET.csproj b/src/Octave.NET/Octave.NET.csproj
index ac0e04c..10e288c 100644
--- a/src/Octave.NET/Octave.NET.csproj
+++ b/src/Octave.NET/Octave.NET.csproj
@@ -1,16 +1,11 @@
- netstandard1.3
+ netstandard2.0;net10.0
- bin\Release\netstandard2.0\Octave.NET.xml
+ bin\Release\$(TargetFramework)\Octave.NET.xml
-
-
-
-
-
diff --git a/src/Octave.NET/OctaveContext.cs b/src/Octave.NET/OctaveContext.cs
index 688aa0a..38c64b2 100644
--- a/src/Octave.NET/OctaveContext.cs
+++ b/src/Octave.NET/OctaveContext.cs
@@ -1,8 +1,10 @@
using Octave.NET.Core.Exceptions;
using Octave.NET.Core.ObjectPooling;
using System;
+using System.IO;
using System.Text;
using System.Threading;
+using System.Threading.Tasks;
namespace Octave.NET
{
@@ -16,8 +18,16 @@ public interface IOctaveContext
///
public partial class OctaveContext : IDisposable, IOctaveContext
{
- private const int CommandTimeoutMilliseconds = 30000;
+ private const int CommandTimeoutMilliseconds = 30_000;
+ private const int ErrorCollectionDelayMilliseconds = 25;
+ private readonly object activeExecutionLock = new object();
+ private readonly object executionLifecycleLock = new object();
+ private readonly ManualResetEventSlim noExecutionWaiters = new ManualResetEventSlim(true);
+ private readonly SemaphoreSlim executionSemaphore = new SemaphoreSlim(1, 1);
+ private Action activeExecutionStop;
private OctaveProcess workerProcess;
+ private int disposed;
+ private int executionWaiterCount;
public OctaveContext()
{
@@ -43,108 +53,746 @@ public OctaveContext()
///
public string Execute(string command, int timeout = CommandTimeoutMilliseconds)
{
+ ThrowIfDisposed();
+
+ if (timeout < Timeout.Infinite)
+ throw new ArgumentOutOfRangeException(nameof(timeout));
+
if (string.IsNullOrEmpty(command))
return "";
- if (this.workerProcess == null)
- Initialize();
+ EnterExecution();
- var localFinishToken = Guid.NewGuid().ToString();
- var hasError = false;
+ var semaphoreAcquired = false;
- var data = new StringBuilder();
- var error = new StringBuilder();
+ try
+ {
+ executionSemaphore.Wait();
+ semaphoreAcquired = true;
+ ThrowIfDisposed();
+ return ExecuteCore(command, timeout);
+ }
+ finally
+ {
+ if (semaphoreAcquired)
+ executionSemaphore.Release();
- var localMre = new AutoResetEvent(false);
+ LeaveExecution();
+ }
+ }
- DataEventHandler LocalOnData = (object sender, string dataStr) =>
+ private string ExecuteCore(string command, int timeout)
+ {
+ var currentWorkerProcess = GetOrInitializeWorkerProcess();
+ var commandExecution = new CommandExecutionState();
+ var commandExecutionGate = new object();
+ var disposeRequested = 0;
+ var commandStarted = 0;
+ var hasError = 0;
+ var processExited = 0;
+ var callbackSignalsEnabled = 1;
+ var startupGate = new object();
+
+ using (var localMre = new AutoResetEvent(false))
{
- if (dataStr == null) return;
+ bool HasTerminalResult()
+ {
+ lock (commandExecutionGate)
+ {
+ return commandExecution.IsComplete ||
+ Volatile.Read(ref hasError) == 1 ||
+ Volatile.Read(ref processExited) == 1;
+ }
+ }
- data.Append(dataStr);
- data.Append(Environment.NewLine);
+ void SignalCompletion()
+ {
+ if (Volatile.Read(ref callbackSignalsEnabled) == 0)
+ return;
+
+ try
+ {
+ localMre.Set();
+ }
+ catch (ObjectDisposedException)
+ {
+ }
+ }
+
+ void LocalOnData(object sender, string dataStr)
+ {
+ lock (commandExecutionGate)
+ {
+ if (commandExecution.TryReadResponse(dataStr) && commandExecution.IsComplete)
+ SignalCompletion();
+ }
+ }
- if (!dataStr.Contains(localFinishToken)) return;
+ void LocalOnError(object sender, string errorStr)
+ {
+ lock (commandExecutionGate)
+ {
+ if (!commandExecution.TryRecordError(errorStr, out var isError))
+ return;
+
+ if (isError)
+ {
+ Interlocked.Exchange(ref hasError, 1);
+ SignalCompletion();
+ }
+ else if (commandExecution.IsComplete)
+ {
+ SignalCompletion();
+ }
+ }
+ }
- data.Length -= Environment.NewLine.Length;
- var finishTokenAnsLenght = "ans = ".Length + localFinishToken.Length;
+ void StopForDispose()
+ {
+ lock (startupGate)
+ {
+ // Once stdout/stderr/exit has already completed the command,
+ // Dispose should not change the result into ObjectDisposedException.
+ if (Volatile.Read(ref callbackSignalsEnabled) == 0 || HasTerminalResult())
+ return;
+
+ if (Interlocked.Exchange(ref disposeRequested, 1) == 1)
+ return;
+
+ if (Volatile.Read(ref commandStarted) == 1)
+ TryInterruptOrRetireWorker(currentWorkerProcess);
+ else
+ currentWorkerProcess.Retire();
+
+ SignalCompletion();
+ }
+ }
+
+ void LocalOnExit(object sender, EventArgs eventArgs)
+ {
+ if (Interlocked.Exchange(ref processExited, 1) == 1)
+ return;
+
+ currentWorkerProcess.Retire();
+ SignalCompletion();
+ }
- data.Length -= Math.Min(data.Length, finishTokenAnsLenght);
- localMre.Set();
+ try
+ {
+ AttachHandlers(currentWorkerProcess, LocalOnData, LocalOnError);
+ AttachExitHandler(currentWorkerProcess, LocalOnExit);
+
+ // Register the dispose stop before stdin writes begin. Otherwise
+ // Dispose can miss a command that is just about to start.
+ SetActiveExecutionStop(StopForDispose);
+
+ lock (startupGate)
+ {
+ if (Volatile.Read(ref disposed) == 1 || Volatile.Read(ref disposeRequested) == 1)
+ throw CreateObjectDisposedException();
+
+ Volatile.Write(ref commandStarted, 1);
+ WriteCommand(currentWorkerProcess, command, commandExecution);
+ }
+
+ var isDone = localMre.WaitOne(timeout);
+ Interlocked.Exchange(ref callbackSignalsEnabled, 0);
+ ClearActiveExecutionStop(StopForDispose);
+
+ if (Volatile.Read(ref disposeRequested) == 1)
+ {
+ RetireAndUnloadWorker(currentWorkerProcess);
+ throw CreateObjectDisposedException();
+ }
+
+ var commandIsComplete = false;
+ var commandHasError = Volatile.Read(ref hasError) == 1;
+
+ lock (commandExecutionGate)
+ {
+ commandIsComplete = commandExecution.IsComplete;
+ }
+
+ var commandCompletedSuccessfully = commandIsComplete && !commandHasError;
+
+ if (Volatile.Read(ref processExited) == 1 && !commandCompletedSuccessfully)
+ {
+ RetireAndUnloadWorker(currentWorkerProcess);
+ throw new OctaveScriptError(commandExecution.Error,
+ new InvalidOperationException("The Octave process exited before the command completed."));
+ }
+
+ if (!isDone && !commandIsComplete && !commandHasError)
+ CommandTimeout(currentWorkerProcess);
+
+ if (commandHasError)
+ {
+ TryInterruptOrRetireWorker(currentWorkerProcess);
+ throw new IOException();
+ }
+
+ return commandExecution.Response;
+ }
+ catch (IOException exception)
+ {
+ Thread.Sleep(ErrorCollectionDelayMilliseconds); // wait in case collected data is incomplete
+
+ DetachHandlers(currentWorkerProcess, LocalOnData, LocalOnError);
+ DetachExitHandler(currentWorkerProcess, LocalOnExit);
+ RetireAndUnloadWorker(currentWorkerProcess);
+
+ throw new OctaveScriptError(commandExecution.Error, exception);
+ }
+ catch (Exception exception) when (ShouldWrapExecutionException(exception))
+ {
+ DetachHandlers(currentWorkerProcess, LocalOnData, LocalOnError);
+ DetachExitHandler(currentWorkerProcess, LocalOnExit);
+ RetireAndUnloadWorker(currentWorkerProcess);
+
+ throw new OctaveScriptError(commandExecution.Error, exception);
+ }
+ finally
+ {
+ Interlocked.Exchange(ref callbackSignalsEnabled, 0);
+ ClearActiveExecutionStop(StopForDispose);
+ DetachHandlers(currentWorkerProcess, LocalOnData, LocalOnError);
+ DetachExitHandler(currentWorkerProcess, LocalOnExit);
+
+ if (workerProcess != null && !currentWorkerProcess.CanBeReused)
+ UnloadWorkerProcess();
+ }
+ }
+ }
+
+ ///
+ /// Execute command and return raw response.
+ ///
+ ///
+ ///
+ ///
+ ///
+ public async Task Execute(string command, CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ cancellationToken.ThrowIfCancellationRequested();
+
+ if (string.IsNullOrEmpty(command))
+ return "";
+
+ EnterExecution();
+
+ var semaphoreAcquired = false;
+
+ try
+ {
+ await executionSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
+ semaphoreAcquired = true;
+ ThrowIfDisposed();
+ return await ExecuteCoreAsync(command, cancellationToken).ConfigureAwait(false);
+ }
+ finally
+ {
+ if (semaphoreAcquired)
+ executionSemaphore.Release();
+
+ LeaveExecution();
+ }
+ }
+
+ private async Task ExecuteCoreAsync(string command, CancellationToken cancellationToken)
+ {
+ var currentWorkerProcess = GetOrInitializeWorkerProcess();
+ var commandExecution = new CommandExecutionState();
+ var commandExecutionGate = new object();
+ var completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var cancellationRequested = 0;
+ var commandStarted = 0;
+ var hasError = 0;
+ var cancellationGate = new object();
+ Action stopForDispose = () =>
+ {
+ lock (cancellationGate)
+ {
+ CompleteForDispose();
+ }
};
- DataEventHandler LocalOnError = (object sender, string errorStr) =>
+ bool HasTerminalResult()
{
- if (errorStr == null) return;
+ lock (commandExecutionGate)
+ {
+ return commandExecution.IsComplete ||
+ Volatile.Read(ref hasError) == 1;
+ }
+ }
- hasError = true;
+ void CompleteCancellation()
+ {
+ if (completionSource.Task.IsCompleted)
+ return;
- error.Append(errorStr + Environment.NewLine);
+ if (HasTerminalResult())
+ return;
- localMre.Set();
- };
+ if (Interlocked.Exchange(ref cancellationRequested, 1) == 1)
+ return;
+
+ try
+ {
+ // Only send CTRL-C after the command has been written. If
+ // cancellation wins before stdin receives the command, retire
+ // the process without signaling so the command cannot start later.
+ if (Volatile.Read(ref commandStarted) == 1)
+ TryInterruptOrRetireWorker(currentWorkerProcess);
+ else
+ currentWorkerProcess.Retire();
+
+ completionSource.TrySetCanceled(cancellationToken);
+ }
+ catch (Exception exception)
+ {
+ currentWorkerProcess.Retire();
+
+ completionSource.TrySetException(new OctaveScriptError(commandExecution.Error, exception));
+ }
+ }
+
+ void CompleteForDispose()
+ {
+ if (completionSource.Task.IsCompleted)
+ return;
+
+ if (HasTerminalResult())
+ return;
+
+ Interlocked.Exchange(ref cancellationRequested, 1);
+
+ if (Volatile.Read(ref commandStarted) == 1)
+ TryInterruptOrRetireWorker(currentWorkerProcess);
+ else
+ currentWorkerProcess.Retire();
+
+ completionSource.TrySetException(CreateObjectDisposedException());
+ }
+
+ void TryCompleteSuccessfully()
+ {
+ string response;
+
+ lock (commandExecutionGate)
+ {
+ if (!commandExecution.IsComplete)
+ return;
+
+ if (Volatile.Read(ref hasError) == 1)
+ return;
+
+ if (Volatile.Read(ref cancellationRequested) == 1)
+ return;
+
+ response = commandExecution.Response;
+ }
+
+ completionSource.TrySetResult(response);
+ }
+
+ void LocalOnData(object sender, string dataStr)
+ {
+ lock (commandExecutionGate)
+ {
+ if (!commandExecution.TryReadResponse(dataStr))
+ return;
+ }
+
+ TryCompleteSuccessfully();
+ }
+
+ void LocalOnError(object sender, string errorStr)
+ {
+ string errorMessage;
+
+ lock (commandExecutionGate)
+ {
+ if (!commandExecution.TryRecordError(errorStr, out var isError))
+ return;
+
+ if (!isError)
+ {
+ errorMessage = null;
+ }
+ else
+ {
+ errorMessage = commandExecution.Error;
+ }
+ }
+
+ if (errorMessage == null)
+ {
+ TryCompleteSuccessfully();
+ return;
+ }
+
+ // CTRL-C commonly emits stderr while Octave unwinds the interrupted
+ // command. Treat that as cancellation noise instead of a script error.
+ if (Volatile.Read(ref cancellationRequested) == 1)
+ return;
+
+ if (Interlocked.Exchange(ref hasError, 1) == 1)
+ return;
+
+ TryInterruptOrRetireWorker(currentWorkerProcess);
+
+ Task.Run(async () =>
+ {
+ await Task.Delay(ErrorCollectionDelayMilliseconds).ConfigureAwait(false);
+
+ lock (commandExecutionGate)
+ {
+ errorMessage = commandExecution.Error;
+ }
+
+ completionSource.TrySetException(new OctaveScriptError(errorMessage, new IOException()));
+ });
+ }
+
+ void LocalOnExit(object sender, EventArgs eventArgs)
+ {
+ if (completionSource.Task.IsCompleted)
+ return;
+
+ if (Volatile.Read(ref cancellationRequested) == 1)
+ return;
+
+ string errorMessage;
+
+ lock (commandExecutionGate)
+ {
+ errorMessage = commandExecution.Error;
+ }
+
+ currentWorkerProcess.Retire();
+ completionSource.TrySetException(new OctaveScriptError(errorMessage,
+ new InvalidOperationException("The Octave process exited before the command completed.")));
+ }
+
+ CancellationTokenRegistration cancellationRegistration = default;
try
{
- workerProcess.OnData += LocalOnData;
- workerProcess.OnError += LocalOnError;
+ AttachHandlers(currentWorkerProcess, LocalOnData, LocalOnError);
+ AttachExitHandler(currentWorkerProcess, LocalOnExit);
- workerProcess.Write(command);
- workerProcess.Write($"\"{localFinishToken}\"");
+ cancellationRegistration = cancellationToken.Register(() =>
+ {
+ lock (cancellationGate)
+ {
+ CompleteCancellation();
+ }
+ });
- var isDone = localMre.WaitOne(timeout);
+ SetActiveExecutionStop(stopForDispose);
- if (hasError)
+ // The same gate is used by cancellation, Dispose, and startup so a
+ // completed cancellation/dispose request cannot race into a write.
+ lock (cancellationGate)
{
- throw new System.IO.IOException();
+ if (Volatile.Read(ref disposed) == 1)
+ {
+ CompleteForDispose();
+ }
+ else if (cancellationToken.IsCancellationRequested)
+ {
+ CompleteCancellation();
+ }
+ else if (!completionSource.Task.IsCompleted)
+ {
+ Volatile.Write(ref commandStarted, 1);
+ WriteCommand(currentWorkerProcess, command, commandExecution);
+ }
}
+ }
+ catch (IOException exception)
+ {
+ ClearActiveExecutionStop(stopForDispose);
+ DetachHandlers(currentWorkerProcess, LocalOnData, LocalOnError);
+ DetachExitHandler(currentWorkerProcess, LocalOnExit);
+ RetireAndUnloadWorker(currentWorkerProcess);
+
+ cancellationRegistration.Dispose();
- error.Length -= Math.Min(error.Length, Environment.NewLine.Length);
+ throw new OctaveScriptError(commandExecution.Error, exception);
+ }
+ catch (Exception exception) when (ShouldWrapExecutionException(exception))
+ {
+ ClearActiveExecutionStop(stopForDispose);
+ DetachHandlers(currentWorkerProcess, LocalOnData, LocalOnError);
+ DetachExitHandler(currentWorkerProcess, LocalOnExit);
+ RetireAndUnloadWorker(currentWorkerProcess);
- if (!isDone) CommandTimeout();
+ cancellationRegistration.Dispose();
- var response = data.ToString().Trim();
- return response;
+ throw new OctaveScriptError(commandExecution.Error, exception);
+ }
+
+ try
+ {
+ return await completionSource.Task.ConfigureAwait(false);
}
- catch (System.IO.IOException exception)
+ finally
{
- Thread.Sleep(25); // wait in case collected data is incomplete
+ cancellationRegistration.Dispose();
- workerProcess.OnData -= LocalOnData;
- workerProcess.OnError -= LocalOnError;
+ ClearActiveExecutionStop(stopForDispose);
+ DetachHandlers(currentWorkerProcess, LocalOnData, LocalOnError);
+ DetachExitHandler(currentWorkerProcess, LocalOnExit);
- UnloadWorkerProcess();
+ if (workerProcess != null && !currentWorkerProcess.CanBeReused)
+ UnloadWorkerProcess();
+ }
+ }
+
+ public void Dispose()
+ {
+ DisposeCore(true);
+ GC.SuppressFinalize(this);
+ }
+
+ ~OctaveContext()
+ {
+ DisposeCore(false);
+ }
+
+ private void DisposeCore(bool disposing)
+ {
+ if (!TryMarkDisposed())
+ return;
+
+ var semaphoreAcquired = false;
- throw new OctaveScriptError(error.ToString(), exception);
+ if (!executionSemaphore.Wait(0))
+ {
+ RequestActiveExecutionStop();
+
+ if (!disposing)
+ return;
+
+ executionSemaphore.Wait();
+ semaphoreAcquired = true;
+ }
+ else
+ {
+ semaphoreAcquired = true;
+ }
+
+ try
+ {
+ ReleaseWorkerForDispose();
+ }
+ catch
+ {
+ if (disposing)
+ throw;
}
finally
{
- if (workerProcess != null)
+ if (semaphoreAcquired)
+ executionSemaphore.Release();
+
+ if (disposing)
{
- workerProcess.OnData -= LocalOnData;
- workerProcess.OnError -= LocalOnError;
+ noExecutionWaiters.Wait();
+ executionSemaphore.Dispose();
+ noExecutionWaiters.Dispose();
}
}
}
- public void Dispose()
+ private void Initialize()
{
- if (workerProcess == null) return;
+ var initializedWorkerProcess = processPool.GetObject();
+
+ try
+ {
+ initializedWorkerProcess.StandardInput.WriteLine("more off;");
+ initializedWorkerProcess.StandardInput.WriteLine("split_long_rows(0);");
+
+ workerProcess = initializedWorkerProcess;
+ }
+ catch
+ {
+ initializedWorkerProcess.Retire();
+ processPool.ReleaseObject(initializedWorkerProcess);
+
+ throw;
+ }
+ }
+
+ private void ReleaseWorkerForDispose()
+ {
+ if (workerProcess == null)
+ return;
UnloadWorkerProcess();
}
- ~OctaveContext()
+ private OctaveProcess GetOrInitializeWorkerProcess()
{
- Dispose();
+ if (workerProcess == null)
+ Initialize();
+
+ // Keep callers on a stable process reference. Error, timeout, and
+ // cancellation cleanup can clear workerProcess before handlers detach.
+ return workerProcess;
}
- private void Initialize()
+ private void RetireAndUnloadWorker(OctaveProcess currentWorkerProcess)
+ {
+ currentWorkerProcess.Retire();
+
+ if (workerProcess != null)
+ UnloadWorkerProcess();
+ }
+
+ private void TryInterruptOrRetireWorker(OctaveProcess currentWorkerProcess)
+ {
+ try
+ {
+ if (!currentWorkerProcess.HasExited)
+ currentWorkerProcess.SendInterrupt();
+ else
+ currentWorkerProcess.Retire();
+ }
+ catch
+ {
+ currentWorkerProcess.Retire();
+ }
+ }
+
+ private void SetActiveExecutionStop(Action stopExecution)
+ {
+ lock (activeExecutionLock)
+ {
+ activeExecutionStop = stopExecution;
+ }
+ }
+
+ private void ClearActiveExecutionStop(Action stopExecution)
+ {
+ lock (activeExecutionLock)
+ {
+ if (ReferenceEquals(activeExecutionStop, stopExecution))
+ activeExecutionStop = null;
+ }
+ }
+
+ private void RequestActiveExecutionStop()
+ {
+ Action stopExecution;
+
+ lock (activeExecutionLock)
+ {
+ stopExecution = activeExecutionStop;
+ }
+
+ try
+ {
+ stopExecution?.Invoke();
+ }
+ catch
+ {
+ }
+ }
+
+ private void EnterExecution()
+ {
+ lock (executionLifecycleLock)
+ {
+ if (Volatile.Read(ref disposed) == 1)
+ throw CreateObjectDisposedException();
+
+ executionWaiterCount++;
+ noExecutionWaiters.Reset();
+ }
+ }
+
+ private void LeaveExecution()
+ {
+ lock (executionLifecycleLock)
+ {
+ executionWaiterCount--;
+
+ if (executionWaiterCount == 0)
+ noExecutionWaiters.Set();
+ }
+ }
+
+ private bool TryMarkDisposed()
+ {
+ lock (executionLifecycleLock)
+ {
+ if (Volatile.Read(ref disposed) == 1)
+ return false;
+
+ Volatile.Write(ref disposed, 1);
+ return true;
+ }
+ }
+
+ private void ThrowIfDisposed()
+ {
+ if (Volatile.Read(ref disposed) == 1)
+ throw CreateObjectDisposedException();
+ }
+
+ private ObjectDisposedException CreateObjectDisposedException()
+ {
+ return new ObjectDisposedException(GetType().FullName);
+ }
+
+ private bool ShouldWrapExecutionException(Exception exception)
{
- workerProcess = processPool.GetObject();
+ if (exception is OctaveCommandTimeoutException)
+ return false;
- workerProcess.StandardInput.WriteLine("more off;");
- workerProcess.StandardInput.WriteLine("split_long_rows(0);");
+ if (exception is ObjectDisposedException && Volatile.Read(ref disposed) == 1)
+ return false;
+
+ return exception is IOException ||
+ exception is InvalidOperationException ||
+ exception is ObjectDisposedException;
+ }
+
+ private static void AttachHandlers(OctaveProcess process, DataEventHandler onData, DataEventHandler onError)
+ {
+ process.OnData += onData;
+ process.OnError += onError;
+ }
+
+ private static void DetachHandlers(OctaveProcess process, DataEventHandler onData, DataEventHandler onError)
+ {
+ process.OnData -= onData;
+ process.OnError -= onError;
+ }
+
+ private static void AttachExitHandler(OctaveProcess process, EventHandler onExit)
+ {
+ process.Exited += onExit;
+ }
+
+ private static void DetachExitHandler(OctaveProcess process, EventHandler onExit)
+ {
+ process.Exited -= onExit;
+ }
+
+ private static void WriteCommand(OctaveProcess process, string command, CommandExecutionState commandExecution)
+ {
+ // Octave is a REPL, so it does not tell us when one command has
+ // finished. Write one marker to stdout and one to stderr; success
+ // waits for both streams to reach this command boundary.
+ process.Write(command);
+ process.Write($"\"{commandExecution.FinishToken}\"");
+ process.Write($"fdisp(stderr, \"{commandExecution.ErrorFinishToken}\")");
}
private void UnloadWorkerProcess()
@@ -153,15 +801,93 @@ private void UnloadWorkerProcess()
workerProcess = null;
}
- private void CommandTimeout()
+ private void CommandTimeout(OctaveProcess currentWorkerProcess)
{
- //TODO find crossplatform way of doing CTRL+C/equivalent instead of throwing away process.
- if (!workerProcess.HasExited)
- workerProcess.Kill(); // object pool will take care of it since it cannot be reused
-
- UnloadWorkerProcess();
+ try
+ {
+ TryInterruptOrRetireWorker(currentWorkerProcess);
+ }
+ finally
+ {
+ if (workerProcess != null)
+ UnloadWorkerProcess();
+ }
throw new OctaveCommandTimeoutException();
}
+
+ private sealed class CommandExecutionState
+ {
+ private const string FinishTokenPrefix = "ans = ";
+
+ private readonly StringBuilder data = new StringBuilder();
+ private readonly StringBuilder error = new StringBuilder();
+ private bool errorFinished;
+ private bool responseFinished;
+
+ public CommandExecutionState()
+ {
+ FinishToken = Guid.NewGuid().ToString();
+ ErrorFinishToken = Guid.NewGuid().ToString();
+ }
+
+ public string FinishToken { get; }
+
+ public string ErrorFinishToken { get; }
+
+ public bool IsComplete => responseFinished && errorFinished;
+
+ public string Response => data.ToString().Trim();
+
+ public string Error => error.ToString().TrimEnd('\r', '\n');
+
+ public bool TryReadResponse(string dataStr)
+ {
+ if (dataStr == null)
+ return false;
+
+ data.Append(dataStr);
+ data.Append(Environment.NewLine);
+
+ if (!dataStr.Contains(FinishToken))
+ return false;
+
+ // Remove the newline and the echoed token, leaving only the
+ // command's output for the caller.
+ data.Length -= Environment.NewLine.Length;
+ data.Length -= Math.Min(data.Length, FinishTokenPrefix.Length + FinishToken.Length);
+
+ responseFinished = true;
+ return true;
+ }
+
+ public bool TryRecordError(string errorStr, out bool isError)
+ {
+ isError = false;
+
+ if (errorStr == null)
+ return false;
+
+ var finishTokenIndex = errorStr.IndexOf(ErrorFinishToken, StringComparison.Ordinal);
+
+ if (finishTokenIndex >= 0)
+ {
+ if (finishTokenIndex > 0)
+ {
+ isError = true;
+ error.Append(errorStr.Substring(0, finishTokenIndex));
+ error.Append(Environment.NewLine);
+ }
+
+ errorFinished = true;
+ return true;
+ }
+
+ isError = true;
+ error.Append(errorStr);
+ error.Append(Environment.NewLine);
+ return true;
+ }
+ }
}
-}
\ No newline at end of file
+}
diff --git a/src/Octave.NET/OctaveProcess.cs b/src/Octave.NET/OctaveProcess.cs
index 5e96cd0..8965765 100644
--- a/src/Octave.NET/OctaveProcess.cs
+++ b/src/Octave.NET/OctaveProcess.cs
@@ -1,15 +1,21 @@
using Octave.NET.Core.ObjectPooling;
using System;
using System.Diagnostics;
+using System.Runtime.InteropServices;
namespace Octave.NET
{
internal class OctaveProcess : Process, IPoolable
{
+ private const int SigInt = 2;
+ private const int InterruptedProcessExitTimeoutMilliseconds = 1_000;
+
public event DataEventHandler OnData;
public event DataEventHandler OnError;
- private readonly bool isDisposed = false;
+ private bool isDisposed = false;
+ private bool canBeReused = true;
+ private bool interruptSent = false;
public OctaveProcess(string octaveCliPath)
{
@@ -23,8 +29,9 @@ public OctaveProcess(string octaveCliPath)
CreateNoWindow = true
};
- this.OutputDataReceived += OctaveProcess_OutputDataReceived;
- this.ErrorDataReceived += OctaveProcess_ErrorDataReceived;
+ OutputDataReceived += OctaveProcess_OutputDataReceived;
+ ErrorDataReceived += OctaveProcess_ErrorDataReceived;
+ EnableRaisingEvents = true;
try
{
@@ -35,18 +42,47 @@ public OctaveProcess(string octaveCliPath)
throw new Exception($"Unable to run '{octaveCliPath}' executable. Make sure that it exists and/or is added to environment PATH variable.", ex);
}
- this.BeginErrorReadLine();
- this.BeginOutputReadLine();
+ BeginErrorReadLine();
+ BeginOutputReadLine();
- this.StandardInput.WriteLine("warning('off','all');");
+ StandardInput.WriteLine("warning('off','all');");
- this.StandardInput.AutoFlush = false;
+ StandardInput.AutoFlush = false;
}
public void Write(string data)
{
- this.StandardInput.WriteLine(data);
- this.StandardInput.Flush();
+ StandardInput.WriteLine(data);
+ StandardInput.Flush();
+ }
+
+ public void SendInterrupt()
+ {
+ if (HasExited) return;
+
+ // After an interrupt, Octave may still be unwinding the current command.
+ // Do not return this process to the pool for a later context.
+ canBeReused = false;
+ interruptSent = true;
+
+ if (TrySendPosixInterrupt())
+ return;
+
+ // Fallback for platforms where libc kill/SIGINT is unavailable.
+ StandardInput.Write((char)3);
+ StandardInput.Flush();
+ }
+
+ public new void Kill()
+ {
+ // Killed workers cannot be safely reused by the object pool.
+ canBeReused = false;
+ base.Kill();
+ }
+
+ public void Retire()
+ {
+ canBeReused = false;
}
private void OctaveProcess_ErrorDataReceived(object sender, DataReceivedEventArgs e)
@@ -59,16 +95,58 @@ private void OctaveProcess_OutputDataReceived(object sender, DataReceivedEventAr
OnData?.Invoke(sender, e.Data);
}
- public bool CanBeReused => !isDisposed && !HasExited;
+ public bool CanBeReused => canBeReused && !isDisposed && !HasExited;
+
+ private bool TrySendPosixInterrupt()
+ {
+ try
+ {
+ return Kill(Id, SigInt) == 0;
+ }
+ catch (DllNotFoundException)
+ {
+ return false;
+ }
+ catch (Exception exception)
+ {
+ // netstandard1.3 does not expose EntryPointNotFoundException as a
+ // referenceable type, so detect it by name and use the stdin fallback.
+ if (exception.GetType().Name == "EntryPointNotFoundException")
+ return false;
+
+ throw;
+ }
+ }
+
+ [DllImport("libc", EntryPoint = "kill")]
+ private static extern int Kill(int pid, int sig);
protected override void Dispose(bool disposing)
{
if (isDisposed) return;
- if (!HasExited) Kill();
+
+ isDisposed = true;
+
+ try
+ {
+ if (!HasExited)
+ {
+ // Interrupt-driven cleanup gives Octave a short window to unwind
+ // before we fall back to killing the process to avoid orphaned workers.
+ if ((!interruptSent || !WaitForExit(InterruptedProcessExitTimeoutMilliseconds)) && !HasExited)
+ Kill();
+ }
+ }
+ catch (InvalidOperationException)
+ {
+ // A failed or externally disposed Process can report that no
+ // process is associated with it. Disposal should still complete
+ // so the pool can discard the worker and create a replacement.
+ }
base.Dispose(disposing);
}
}
internal delegate void DataEventHandler(object sender, string data);
-}
\ No newline at end of file
+}
diff --git a/src/Octave.NET/OctaveStringExtensions.cs b/src/Octave.NET/OctaveStringExtensions.cs
index 52367c1..bfd3bbd 100644
--- a/src/Octave.NET/OctaveStringExtensions.cs
+++ b/src/Octave.NET/OctaveStringExtensions.cs
@@ -60,7 +60,26 @@ private static double ParseDouble(string number)
if (number.Contains("Inf"))
return double.MaxValue;
- return double.Parse(number, CultureInfo.InvariantCulture);
+ double result;
+
+ try
+ {
+ result = double.Parse(number, CultureInfo.InvariantCulture);
+ }
+ catch (OverflowException)
+ {
+ return number.TrimStart().StartsWith("-")
+ ? double.MinValue
+ : double.MaxValue;
+ }
+
+ if (double.IsPositiveInfinity(result))
+ return double.MaxValue;
+
+ if (double.IsNegativeInfinity(result))
+ return double.MinValue;
+
+ return result;
}
private static string CleanInput(string input)
@@ -72,4 +91,4 @@ private static string CleanInput(string input)
return input;
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Octave.NET/Properties/AssemblyInfo.cs b/src/Octave.NET/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..1f72ec1
--- /dev/null
+++ b/src/Octave.NET/Properties/AssemblyInfo.cs
@@ -0,0 +1,3 @@
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("Octave.NET.Tests")]