工作流引擎是现代企业应用中不可或缺的组件,它能够帮助我们构建复杂的业务流程,实现流程的可视化、自动化和标准化。WorkflowCore 是一个开源的.NET 工作流引擎,提供了丰富的功能和灵活的扩展性。
WorkflowCore 简介核心特性
轻量级设计 - 无需外部依赖,易于集成
多种流程模式 - 支持顺序、并行、条件分支、循环等
状态持久化 - 支持多种存储后端(内存、SQL、MongoDB 等)
分布式执行 - 支持多节点集群部署
事件驱动 - 支持外部事件触发流程继续执行
规则引擎集成 - 可与 RulesEngine 等规则引擎配合使用
适用场景
场景类型
描述
示例
审批流程
多级审批、条件分支
请假申请、费用报销
数据处理
ETL、数据同步
数据清洗、格式转换
业务流程
订单处理、支付流程
电商订单、金融交易
系统集成
服务编排、API 调用
微服务协调、第三方集成
环境准备与配置🔧 NuGet 包安装1234
🏗️ 依赖注入配置123456789101112131415public void ConfigureServices(IServiceCollection services){ // 添加工作流核心服务 services.AddWorkflow(); // 添加自定义步骤 services.AddTransient(); services.AddTransient(); services.AddTransient(); services.AddTransient(); services.AddTransient(); // 可选:添加其他存储提供程序 // services.AddWorkflow(x => x.UseSqlServer(@"Server=.;Database=WorkflowCore;Trusted_Connection=true;", true, true));}
工作流模式实现1. 顺序工作流顺序工作流是最基础的流程模式,步骤按顺序依次执行。
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758/// /// 顺序工作流定义/// public class SequentialWorkflow : IWorkflow{ public string Id => "SequentialWorkflow"; public int Version => 1; public void Build(IWorkflowBuilder builder) { builder .StartWith() .Then() .Then(); }}// 工作流步骤实现public class StepA : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine($"Step A 执行 - 输入数据: {context.Workflow.Data}"); // 可以访问和修改工作流数据 var data = (int)context.Workflow.Data; context.Workflow.Data = data * 2; return ExecutionResult.Next(); }}public class StepB : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine($"Step B 执行 - 当前数据: {context.Workflow.Data}"); // 模拟一些业务逻辑 var data = (int)context.Workflow.Data; if (data > 10) { Console.WriteLine("数据已超过阈值,执行特殊处理"); } return ExecutionResult.Next(); }}public class StepC : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine($"Step C 执行 - 最终数据: {context.Workflow.Data}"); Console.WriteLine("顺序流程执行完成!"); return ExecutionResult.Next(); }}
🔀 2. 状态机工作流状态机工作流适用于需要等待外部事件或具有复杂状态转换的场景。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869/// /// 订单状态机工作流/// public class OrderStateMachine : IWorkflow{ public string Id => "OrderStateMachine"; public int Version => 1; public void Build(IWorkflowBuilder builder) { builder .StartWith(context => { var orderData = (OrderData)context.Workflow.Data; orderData.Status = "Created"; orderData.CreateTime = DateTime.Now; Console.WriteLine($"订单创建: {orderData.OrderId} - 状态: {orderData.Status}"); }) // 等待付款事件 .WaitFor("PendingPaymentEvent", data => data.OrderId) .Then(context => { var orderData = (OrderData)context.Workflow.Data; orderData.Status = "Paid"; orderData.PaymentTime = DateTime.Now; Console.WriteLine($"订单 {orderData.OrderId} 已付款"); // 可以在这里添加付款后的业务逻辑 // 比如:库存扣减、发送确认邮件等 }) // 等待发货事件 .WaitFor("ShipOrderEvent", data => data.OrderId) .Then(context => { var orderData = (OrderData)context.Workflow.Data; orderData.Status = "Shipped"; orderData.ShipTime = DateTime.Now; Console.WriteLine($"订单 {orderData.OrderId} 已发货"); // 发货后业务逻辑 // 比如:更新物流信息、发送发货通知等 }) // 等待订单完成事件 .WaitFor("CompleteOrderEvent", data => data.OrderId) .Then(context => { var orderData = (OrderData)context.Workflow.Data; orderData.Status = "Completed"; orderData.CompleteTime = DateTime.Now; Console.WriteLine($"订单 {orderData.OrderId} 已完成"); // 完成后业务逻辑 // 比如:积分奖励、评价提醒等 }); }}// 订单数据模型public class OrderData{ public string OrderId { get; set; } public string Status { get; set; } public DateTime CreateTime { get; set; } public DateTime? PaymentTime { get; set; } public DateTime? ShipTime { get; set; } public DateTime? CompleteTime { get; set; } public decimal Amount { get; set; } public string CustomerName { get; set; }}
🔀 3. 并行工作流并行工作流允许多个步骤同时执行,适用于可以并行处理的业务场景。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158/// /// 并行审批工作流/// public class ParallelApprovalWorkflow : IWorkflow{ public string Id => "ParallelApprovalWorkflow"; public int Version => 1; public void Build(IWorkflowBuilder builder) { builder .StartWith(context => { var data = (ApprovalData)context.Workflow.Data; Console.WriteLine($"启动并行审批流程 - 申请ID: {data.ApplicationId}"); data.StartTime = DateTime.Now; }) // 并行执行多个审批步骤 .Parallel() .Do(branch => branch.StartWith() .Then()) .Do(branch => branch.StartWith() .Then()) .Do(branch => branch.StartWith()) .Join() // 所有并行分支完成后执行 .Then() .Then(context => { var data = (ApprovalData)context.Workflow.Data; data.EndTime = DateTime.Now; data.TotalDuration = data.EndTime.Value - data.StartTime.Value; Console.WriteLine($"并行审批流程完成 - 总耗时: {data.TotalDuration.Value.TotalSeconds:F2}秒"); }); }}// 审批步骤实现public class DepartmentAApproval : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 部门A审批中..."); // 模拟审批处理时间 Thread.Sleep(1000); var data = (ApprovalData)context.Workflow.Data; data.DeptAApproved = true; data.DeptAApprovalTime = DateTime.Now; Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 部门A审批完成 ✓"); return ExecutionResult.Next(); }}public class ManagerAApproval : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 经理A审批中..."); Thread.Sleep(800); var data = (ApprovalData)context.Workflow.Data; data.ManagerAApproved = true; Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 经理A审批完成 ✓"); return ExecutionResult.Next(); }}public class DepartmentBApproval : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 部门B审批中..."); Thread.Sleep(1200); var data = (ApprovalData)context.Workflow.Data; data.DeptBApproved = true; data.DeptBApprovalTime = DateTime.Now; Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 部门B审批完成 ✓"); return ExecutionResult.Next(); }}public class ManagerBApproval : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 经理B审批中..."); Thread.Sleep(600); var data = (ApprovalData)context.Workflow.Data; data.ManagerBApproved = true; Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 经理B审批完成 ✓"); return ExecutionResult.Next(); }}public class FinanceApproval : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 财务审批中..."); Thread.Sleep(900); var data = (ApprovalData)context.Workflow.Data; data.FinanceApproved = true; data.FinanceApprovalTime = DateTime.Now; Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 财务审批完成 ✓"); return ExecutionResult.Next(); }}public class FinalApprovalStep : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { var data = (ApprovalData)context.Workflow.Data; bool allApproved = data.DeptAApproved && data.DeptBApproved && data.FinanceApproved && data.ManagerAApproved && data.ManagerBApproved; data.FinalResult = allApproved ? "Approved" : "Rejected"; Console.WriteLine($"最终审批结果: {data.FinalResult}"); return ExecutionResult.Next(); }}// 审批数据模型public class ApprovalData{ public string ApplicationId { get; set; } = Guid.NewGuid().ToString("N")[..8]; public DateTime StartTime { get; set; } public DateTime? EndTime { get; set; } public TimeSpan? TotalDuration { get; set; } public bool DeptAApproved { get; set; } public bool DeptBApproved { get; set; } public bool FinanceApproved { get; set; } public bool ManagerAApproved { get; set; } public bool ManagerBApproved { get; set; } public DateTime? DeptAApprovalTime { get; set; } public DateTime? DeptBApprovalTime { get; set; } public DateTime? FinanceApprovalTime { get; set; } public string FinalResult { get; set; }}
4. 规则决策工作流结合 RulesEngine 实现基于规则的决策流程。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133/// /// 规则决策工作流/// public class RuleBasedWorkflow : IWorkflow{ public string Id => "RuleBasedWorkflow"; public int Version => 1; public void Build(IWorkflowBuilder builder) { builder .StartWith() .If(data => data.Age >= 18) .Do(branch => branch .StartWith() .If(data => data.Age >= 65) .Do(seniorBranch => seniorBranch.StartWith()) .End()) .If(data => data.Age < 18) .Do(branch => branch.StartWith()) .End() .Then(); }}public class ValidatePersonStep : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { var person = (Person)context.Workflow.Data; Console.WriteLine($"验证人员信息: {person.Name}, 年龄: {person.Age}"); if (string.IsNullOrEmpty(person.Name) || person.Age < 0) { Console.WriteLine("人员信息验证失败"); return ExecutionResult.Next(); // 或者返回错误 } Console.WriteLine("人员信息验证通过"); return ExecutionResult.Next(); }}public class AdultProcessingStep : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { var person = (Person)context.Workflow.Data; Console.WriteLine($"处理成年人 {person.Name} 的相关业务"); return ExecutionResult.Next(); }}public class MinorProcessingStep : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { var person = (Person)context.Workflow.Data; Console.WriteLine($"处理未成年人 {person.Name} 的相关业务(需要监护人同意)"); return ExecutionResult.Next(); }}public class SeniorCitizenStep : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { var person = (Person)context.Workflow.Data; Console.WriteLine($"为老年人 {person.Name} 提供特殊服务"); return ExecutionResult.Next(); }}public class FinalProcessingStep : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { var person = (Person)context.Workflow.Data; Console.WriteLine($"完成 {person.Name} 的所有处理流程"); return ExecutionResult.Next(); }}// 使用RulesEngine进行更复杂的规则处理public static async Task RulesEngineExample(){ var ruleJson = new string[] { @"{ ""WorkflowName"": ""PersonAgeRule"", ""Rules"": [ { ""RuleName"": ""IsAdult"", ""SuccessEvent"": ""Adult"", ""ErrorMessage"": ""Underage"", ""Expression"": ""input1.Age >= 18"" }, { ""RuleName"": ""IsSenior"", ""SuccessEvent"": ""Senior"", ""ErrorMessage"": ""Not Senior"", ""Expression"": ""input1.Age >= 65"" }, { ""RuleName"": ""HasValidName"", ""SuccessEvent"": ""ValidName"", ""ErrorMessage"": ""Invalid Name"", ""Expression"": ""!string.IsNullOrEmpty(input1.Name)"" } ] }" }; var reSettings = new ReSettings { CustomTypes = new[] { typeof(Person) } }; var engine = new RulesEngine.RulesEngine(ruleJson, reSettings); var person = new Person { Name = "张三", Age = 25 }; var results = await engine.ExecuteAllRulesAsync("PersonAgeRule", person); foreach (var result in results) { Console.WriteLine($"规则 {result.Rule.RuleName}: {(result.IsSuccess ? "通过" : "失败")} - {result.Rule.SuccessEvent}"); }}// 人员数据模型public class Person{ public string Name { get; set; } public int Age { get; set; } public string Email { get; set; } public string PhoneNumber { get; set; }}
完整使用示例🚀 主程序实现123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122public static async Task WorkFlowTest(){ // 1. 配置 DI 和 Workflow Host var services = new ServiceCollection() .AddLogging() .AddWorkflow(); // 默认使用内存存储 // 注册工作流步骤 services.AddTransient(); services.AddTransient(); services.AddTransient(); services.AddTransient(); services.AddTransient(); services.AddTransient(); services.AddTransient(); services.AddTransient(); services.AddTransient(); var serviceProvider = services.BuildServiceProvider(); var host = serviceProvider.GetService(); if (host == null) { throw new InvalidOperationException("无法创建 WorkflowHost"); } // 注册工作流定义 host.RegisterWorkflow(); host.RegisterWorkflow(); host.RegisterWorkflow(); host.RegisterWorkflow(); // 启动工作流主机 host.Start(); try { // 2. 顺序流程示例 Console.WriteLine("=== 顺序流程测试 ==="); var seqId = await host.StartWorkflow("SequentialWorkflow", 1, 3); await WaitForCompletion(host, seqId); // 3. 状态机流程示例 Console.WriteLine("\n=== 状态机流程测试 ==="); var order = new OrderData { OrderId = $"ORD{DateTime.Now:yyyyMMddHHmmss}", Amount = 299.99m, CustomerName = "张三" }; var smId = await host.StartWorkflow("OrderStateMachine", 1, order); // 模拟外部事件触发 await Task.Delay(500); Console.WriteLine("模拟用户付款..."); await host.PublishEvent("PendingPaymentEvent", order.OrderId, null); await Task.Delay(1000); Console.WriteLine("模拟商家发货..."); await host.PublishEvent("ShipOrderEvent", order.OrderId, null); await Task.Delay(1000); Console.WriteLine("模拟用户确认收货..."); await host.PublishEvent("CompleteOrderEvent", order.OrderId, null); await WaitForCompletion(host, smId); // 4. 并行流程示例 Console.WriteLine("\n=== 并行审批流程测试 ==="); var approvalData = new ApprovalData(); var parId = await host.StartWorkflow("ParallelApprovalWorkflow", 1, approvalData); await WaitForCompletion(host, parId); // 5. 规则决策流程示例 Console.WriteLine("\n=== 规则决策流程测试 ==="); var persons = new[] { new Person { Name = "小明", Age = 16 }, new Person { Name = "张三", Age = 25 }, new Person { Name = "李奶奶", Age = 68 } }; foreach (var person in persons) { Console.WriteLine($"\n处理人员: {person.Name}"); var ruleId = await host.StartWorkflow("RuleBasedWorkflow", 1, person); await WaitForCompletion(host, ruleId); } // 6. RulesEngine示例 Console.WriteLine("\n=== RulesEngine 规则测试 ==="); await RulesEngineExample(); } finally { host.Stop(); Console.WriteLine("\n所有工作流测试完成。"); }}// 等待工作流完成的辅助方法static async Task WaitForCompletion(IWorkflowHost host, string workflowId){ WorkflowStatus status; do { await Task.Delay(200); var instance = await host.PersistenceStore.GetWorkflowInstance(workflowId); status = instance.Status; if (status == WorkflowStatus.Runnable) { Console.WriteLine($"工作流 {workflowId} 正在运行中..."); } } while (status != WorkflowStatus.Complete && status != WorkflowStatus.Terminated && status != WorkflowStatus.Suspended); Console.WriteLine($"工作流 {workflowId} 执行完成,状态: {status}");}
高级特性与优化技巧错误处理与重试12345678910111213141516171819202122232425262728293031323334353637383940414243public class RobustStep : StepBody{ public int MaxRetries { get; set; } = 3; public TimeSpan RetryInterval { get; set; } = TimeSpan.FromSeconds(1); public override ExecutionResult Run(IStepExecutionContext context) { try { // 执行可能失败的操作 DoSomeRiskyOperation(); return ExecutionResult.Next(); } catch (Exception ex) { var retryCount = context.PersistenceData.GetValueOrDefault("retryCount", 0); if (retryCount < MaxRetries) { context.PersistenceData["retryCount"] = retryCount + 1; Console.WriteLine($"步骤执行失败,将在 {RetryInterval.TotalSeconds} 秒后重试 (第 {retryCount + 1} 次)"); return ExecutionResult.Sleep(RetryInterval, new object()); } else { Console.WriteLine($"步骤执行失败,已超过最大重试次数: {ex.Message}"); return ExecutionResult.Next(); // 或者 ExecutionResult.Terminate() } } } private void DoSomeRiskyOperation() { // 模拟可能失败的操作 if (new Random().NextDouble() < 0.7) // 70% 失败率 { throw new InvalidOperationException("模拟的操作失败"); } Console.WriteLine("操作执行成功"); }}
工作流监控与日志1234567891011121314151617181920212223242526272829303132333435363738public class MonitoringWorkflowStep : StepBody{ private readonly ILogger _logger; public MonitoringWorkflowStep(ILogger logger) { _logger = logger; } public override ExecutionResult Run(IStepExecutionContext context) { var stopwatch = Stopwatch.StartNew(); try { _logger.LogInformation($"开始执行步骤: {GetType().Name}"); // 执行业务逻辑 ExecuteBusinessLogic(context); stopwatch.Stop(); _logger.LogInformation($"步骤执行成功,耗时: {stopwatch.ElapsedMilliseconds}ms"); return ExecutionResult.Next(); } catch (Exception ex) { stopwatch.Stop(); _logger.LogError(ex, $"步骤执行失败,耗时: {stopwatch.ElapsedMilliseconds}ms"); throw; } } protected virtual void ExecuteBusinessLogic(IStepExecutionContext context) { // 子类实现具体业务逻辑 }}
🎛️ 动态工作流配置1234567891011121314151617181920212223242526272829303132333435363738394041424344public class DynamicWorkflowBuilder{ public static IWorkflow CreateWorkflowFromConfig(WorkflowConfig config) { return new DynamicWorkflow(config); }}public class DynamicWorkflow : IWorkflow{ private readonly WorkflowConfig _config; public DynamicWorkflow(WorkflowConfig config) { _config = config; } public string Id => _config.Id; public int Version => _config.Version; public void Build(IWorkflowBuilder builder) { var currentBuilder = builder.StartWith(_config.Steps.First().StepType); foreach (var step in _config.Steps.Skip(1)) { currentBuilder = currentBuilder.Then(step.StepType); } }}public class WorkflowConfig{ public string Id { get; set; } public int Version { get; set; } public List Steps { get; set; } = new();}public class StepConfig{ public string Name { get; set; } public Type StepType { get; set; } public Dictionary Parameters { get; set; } = new();}
性能优化建议⚡ 批量处理优化123456789101112131415161718192021222324252627public class BatchProcessingStep : StepBody{ public override ExecutionResult Run(IStepExecutionContext context) { var batchData = (List)context.Workflow.Data; // 分批处理,避免内存溢出 const int batchSize = 100; for (int i = 0; i < batchData.Count; i += batchSize) { var batch = batchData.Skip(i).Take(batchSize); ProcessBatch(batch); } return ExecutionResult.Next(); } private void ProcessBatch(IEnumerable batch) { Parallel.ForEach(batch, ProcessItem); } private void ProcessItem(DataItem item) { // 处理单个数据项 }}
🔧 资源管理1234567891011121314151617181920212223242526272829303132333435public class ResourceAwareStep : StepBody, IDisposable{ private readonly SemaphoreSlim _semaphore = new(5, 5); // 限制并发数 private bool _disposed = false; public override async Task RunAsync(IStepExecutionContext context) { await _semaphore.WaitAsync(); try { // 执行资源密集型操作 await ExecuteResourceIntensiveOperation(); return ExecutionResult.Next(); } finally { _semaphore.Release(); } } private async Task ExecuteResourceIntensiveOperation() { // 模拟资源密集型操作 await Task.Delay(1000); } public void Dispose() { if (!_disposed) { _semaphore?.Dispose(); _disposed = true; } }}
小结WorkflowCore 为 C#开发者提供了实用的工作流构建能力,它的主要优势包括:
简单易用 - 直观的流程定义 API,易于理解和维护
🔧 灵活扩展 - 支持自定义步骤、事件和存储提供程序
⚡ 高性能 - 异步执行模型,支持大规模并发处理
🛡️ 可靠性 - 内置错误处理、重试机制和状态持久化
可监控 - 丰富的执行状态信息和日志记录
适用场景总结
工作流类型
适用场景
关键特性
顺序工作流
数据处理管道、简单审批
步骤依次执行
状态机工作流
订单处理、长期业务流程
事件驱动、状态转换
并行工作流
多部门审批、批量处理
并发执行、汇聚等待
规则决策工作流
动态路由、条件分支
规则引擎集成
通过合理使用 WorkflowCore,我们可以构建出既灵活又可靠的企业级工作流系统,有效提升业务流程的自动化程度和执行效率。
💡 开发经验分享:
模块化设计: 将复杂工作流拆分为可重用的子流程
状态监控: 实施完善的工作流执行监控和告警机制
测试覆盖: 为工作流定义和步骤编写充分的单元测试
性能调优: 根据实际负载调整存储后端和并发配置
相关资源:
WorkflowCore GitHub
RulesEngine GitHub
官方文档