Asynchronous Data Methods
- Last UpdatedJan 12, 2026
- 4 minute read
- PI System
- AF SDK 3.2.0
- Developer
Asynchronous data methods provide a mechanism for concurrency during a data access call to the PI Data Archive. In this case, concurrency refers to two types of operations that can execute simultaneously:
- The handling of the data request by the PI Data Archive, possibly over the network.
- Continued execution of client-side code that does not depend on the result of the request.
PI AF SDK 2.8 introduces asynchronous data access methods based on the Task-based Async pattern introduced in .NET Framework 4.5. These methods are implemented in both the OSIsoft.AF.Data and OSIsoft.AF.PI namespaces. They are supported for AF Attributes configured with PI Point data references.
These asynchronous methods can be advantageous in many cases:
- In front-end applications, the UI thread can stay responsive during a data access call.
- In both client and server applications, the number of threads used to service a call can be reduced, as waiting threads can be returned to the thread pool for re-use.
- The effect of latency is mitigated because remote calls can be executed concurrently.
This example demonstrates an asynchronous method that retrieves the summary data for a given attribute of a specific element type. Summaries span the range of one day. The GetSummariesAsyncThrottled() demonstrates throttling asynchronous method calls to avoid one client executing many asynchronous requests and exhausting server resources. The GetSummariesAsyncWithTimeout() demonstrates placing a timeout on asynchronous method calls.
1void Async_Program(AFAttributeList attrList) 2{ 3 try 4 { 5 // Get summaries asynchronously and wait for the result 6 Task<IList<IDictionary<AFSummaryTypes, AFValue>>> summariesTask = GetSummariesAsync(attrList); 7 IList<IDictionary<AFSummaryTypes, AFValue>> summaries = summariesTask.Result; 8 Console.WriteLine("GetSummariesAsync:"); 9 foreach (var summary in summaries) 10 { 11 WriteSummaryItem(summary); 12 } 13 Console.WriteLine(); 14 15 // Get summaries asynchronously limited to 5 concurrent threads and wait for the result 16 summariesTask = GetSummariesAsyncThrottled(attrList, 5); 17 summaries = summariesTask.Result; 18 Console.WriteLine("GetSummariesAsyncThrottled:"); 19 foreach (var summary in summaries) 20 { 21 WriteSummaryItem(summary); 22 } 23 Console.WriteLine(); 24 25 // Get summaries asynchronously with a timeout and wait for the result 26 summariesTask = GetSummariesAsyncWithTimeout(attrList, 1000); 27 summaries = summariesTask.Result; 28 Console.WriteLine("GetSummariesAsyncThrottled:"); 29 foreach (var summary in summaries) 30 { 31 WriteSummaryItem(summary); 32 } 33 Console.WriteLine(); 34 } 35 catch (AggregateException ae) 36 { 37 Console.WriteLine("{0}", ae.Flatten().InnerException.Message); 38 } 39} 40 41private static void WriteSummaryItem(IDictionary<AFSummaryTypes, AFValue> summary) 42{ 43 Console.WriteLine("Summary for {0}", summary[AFSummaryTypes.Minimum].Attribute.Element); 44 Console.WriteLine(" Minimum: {0:N0}", summary[AFSummaryTypes.Minimum].ValueAsDouble()); 45 Console.WriteLine(" Maximum: {0:N0}", summary[AFSummaryTypes.Maximum].ValueAsDouble()); 46 Console.WriteLine(" Average: {0:N0}", summary[AFSummaryTypes.Average].ValueAsDouble()); 47 Console.WriteLine(" Total: {0:N0}", summary[AFSummaryTypes.Total].ValueAsDouble()); 48 Console.WriteLine(); 49} 50 51public static async Task<IList<IDictionary<AFSummaryTypes, AFValue>>> GetSummariesAsync(AFAttributeList attributeList) 52{ 53 Console.WriteLine("Calling GetSummariesAsync\n"); 54 55 Task<IDictionary<AFSummaryTypes, AFValue>>[] tasks = attributeList 56 // Do not make the call if async is not supported 57 .Where(attr => (attr.SupportedDataMethods & AFDataMethods.Asynchronous) == AFDataMethods.Asynchronous) 58 .Select(async attr => 59 { 60 try 61 { 62 AFSummaryTypes mySummaries = AFSummaryTypes.Minimum | AFSummaryTypes.Maximum | AFSummaryTypes.Average | AFSummaryTypes.Total; 63 AFTimeRange timeRange = new AFTimeRange(new AFTime("*-1d"), new AFTime("*")); 64 65 return await attr.Data.SummaryAsync( 66 timeRange: timeRange, 67 summaryType: mySummaries, 68 calculationBasis: AFCalculationBasis.TimeWeighted, 69 timeType: AFTimestampCalculation.Auto); 70 } 71 catch (AggregateException ae) 72 { 73 Console.WriteLine("{0}: {1}", attr.Name, ae.Flatten().InnerException.Message); 74 return null; 75 } 76 }) 77 .ToArray(); 78 79 return await Task.WhenAll(tasks); 80} 81 82public static async Task<IList<IDictionary<AFSummaryTypes, AFValue>>> GetSummariesAsyncThrottled(AFAttributeList attributeList, int numConcurrent) 83{ 84 // Use "asynchronous semaphore" pattern (e.g. SemaphoreSlim.WaitAsync()) to throttle the calls 85 Console.WriteLine("Calling GetSummariesAsyncThrottled"); 86 87 // Example: Limit to numConcurrent concurrent async I/O operations. 88 SemaphoreSlim throttler = new SemaphoreSlim(initialCount: numConcurrent); 89 90 Task<IDictionary<AFSummaryTypes, AFValue>>[] tasks = attributeList 91 // Do not make the call if async is not supported 92 .Where(attr => (attr.SupportedDataMethods & AFDataMethods.Asynchronous) == AFDataMethods.Asynchronous) 93 .Select(async attr => 94 { 95 // asynchronously try to acquire the semaphore 96 await throttler.WaitAsync(); 97 98 try 99 { 100 AFSummaryTypes mySummaries = AFSummaryTypes.Minimum | AFSummaryTypes.Maximum | AFSummaryTypes.Average | AFSummaryTypes.Total; 101 AFTimeRange timeRange = new AFTimeRange(new AFTime("*-1d"), new AFTime("*")); 102 103 return await attr.Data.SummaryAsync( 104 timeRange: timeRange, 105 summaryType: mySummaries, 106 calculationBasis: AFCalculationBasis.TimeWeighted, 107 timeType: AFTimestampCalculation.Auto); 108 } 109 catch (AggregateException ae) 110 { 111 Console.WriteLine("{0}: {1}", attr.Name, ae.Flatten().InnerException.Message); 112 return null; 113 } 114 finally 115 { 116 // release the resource 117 throttler.Release(); 118 } 119 }) 120 .ToArray(); 121 122 return await Task.WhenAll(tasks); 123} 124 125public static async Task<IList<IDictionary<AFSummaryTypes, AFValue>>> GetSummariesAsyncWithTimeout(AFAttributeList attributeList, int timeoutInMilliseconds) 126{ 127 // Use a "competing tasks" pattern to place timeout on multiple async requests 128 Console.WriteLine("Calling GetSummariesAsyncWithTimeout"); 129 130 CancellationTokenSource cts = new CancellationTokenSource(); 131 CancellationToken token = cts.Token; 132 CancellationTokenSource ctsForTimer = new CancellationTokenSource(); 133 CancellationToken tokenForTimer = ctsForTimer.Token; 134 135 Task<IDictionary<AFSummaryTypes, AFValue>>[] tasks = attributeList 136 // Do not make the call if async is not supported 137 .Where(attr => (attr.SupportedDataMethods & AFDataMethods.Asynchronous) == AFDataMethods.Asynchronous) 138 .Select(async attr => 139 { 140 try 141 { 142 AFSummaryTypes mySummaries = AFSummaryTypes.Minimum | AFSummaryTypes.Maximum | AFSummaryTypes.Average | AFSummaryTypes.Total; 143 AFTimeRange timeRange = new AFTimeRange(new AFTime("*-1d"), new AFTime("*")); 144 145 return await attr.Data.SummaryAsync( 146 timeRange: timeRange, 147 summaryType: mySummaries, 148 calculationBasis: AFCalculationBasis.TimeWeighted, 149 timeType: AFTimestampCalculation.Auto, 150 cancellationToken: token); 151 } 152 catch (AggregateException ae) 153 { 154 Console.WriteLine("{0}: {1}", attr.Element.Name, ae.Flatten().InnerException.Message); 155 return null; 156 } 157 catch (OperationCanceledException oe) 158 { 159 Console.WriteLine("{0}: {1}", attr.Element.Name, oe.Message); 160 return null; 161 } 162 }) 163 .ToArray(); 164 165 // Define a task that completes when all subtasks are complete 166 Task<IDictionary<AFSummaryTypes, AFValue>[]> allTtasks = Task.WhenAll(tasks); 167 168 // Asynchronously wait for either the summaries or timer task to complete 169 var finishedTask = await Task.WhenAny(allTtasks, Task.Delay(timeoutInMilliseconds, tokenForTimer)); 170 if (finishedTask.Equals(allTtasks)) 171 { 172 // Cancel the timer task 173 ctsForTimer.Cancel(); 174 // Return summaries result 175 return allTtasks.Result; 176 } 177 else 178 { 179 // Cancel the summaries task if timeout 180 cts.Cancel(); 181 throw new TimeoutException("The operation has timed out."); 182 } 183}