概念
在Orleans中,Streaming是一组API和功能集,它提供了一种构建、发布和消费数据流的方式。
这些流可以是任何类型的数据,从简单的消息到复杂的事件或数据记录。Streaming API允许你定义、发布和消费这些流,而无需关心底层的传输机制或数据存储。
每个流都有一个唯一的标识符,称为StreamId,用于区分不同的流。流可以是持久的,也可以是临时的,具体取决于所使用的流提供者(Stream Provider)。流提供者负责处理流的存储、传输和故障恢复。
作用
Streaming在Orleans中起到了至关重要的作用,主要体现在以下几个方面:
-
解耦:Streaming允许将数据的产生者和消费者解耦。生产者可以发布数据到流中,而消费者可以独立地订阅这些流并处理数据。这种解耦使得系统更加灵活和可扩展。
-
实时性:通过Streaming,你可以实时地处理和响应数据流。这对于需要实时分析、监控或响应的场景非常有用。
-
故障恢复:Orleans的Streaming机制具有强大的故障恢复能力。即使在出现网络分区或节点故障的情况下,流提供者也能够确保数据的可靠性和一致性。
应用场景
-
实时日志分析:你可以将应用程序的日志消息发布到流中,并使用专门的消费者来分析这些日志。这允许你实时地监控和响应应用程序的行为。
-
事件驱动架构:在事件驱动架构中,你可以使用Streaming来发布事件,并由多个消费者来处理这些事件。这有助于构建松耦合、可扩展和响应式的系统。
-
分布式协作:Streaming也可以用于实现分布式系统中的协作和通信。例如,多个节点可以发布状态更新到流中,其他节点可以订阅这些流以获取最新的状态信息。
示例
安装nuget包
"Microsoft.Orleans.Streaming" Version="8.0.0" />
配置Streaming
siloHostBuilder服务器托管.AddMemoryStreams("StreamProvider").AddMemoryGrainStorage("PubSubStore");
定义一个Grain生成事件
public interface ISender : IGrainWithStringKey { Task Send(Guid rid); } public class SenderGrain : Grain, ISender { public Task Send(Guid rid) { var streamProvider = this.GetStreamProvider("StreamProvider"); var streamId = StreamId.Create("RANDOMDATA", rid); var stream = streamProvider.GetStreamint>(streamId); RegisterTimer(_ => { return stream.OnNextAsync(Random.Shared.Next()); }, null, TimeSpan.FromMilliseconds(1_000), TimeSpan.FromMilliseconds(1_000)); return Task.CompletedTask; } }
再定义一个Grain订阅事件
public interface IRandomReceiver : IGrainWithGuidKey { Task Receive(); } [ImplicitStreamSubscription("RANDOMDATA")] public class ReceiverGrain : Grain, IRandomReceiver { public override async Task OnActivateAsync(CancellationToken cancellationToken) { var streamProvider = this.GetStreamProvider("StreamProvider"); var rid = this.GetPrimaryKey(); var streamId = StreamId.Create("RANDOMDATA", rid); var stream = streamProvider.GetStreamint>(streamId); await stream.SubscribeAsyncint>( async (data, token) => { Console.WriteLine(data); await Task.CompletedTask; }); base.OnActivateAsync(cancellationToken); } public async Task Receive() { } }
然后即可测试
var rid = Guid.NewGuid(); 服务器托管var sender1 = client.GetGrain("sender1"); await sender1.Send(rid); var reciver1 = client.GetGrain(new Guid()); await reciver1.Receive();
流提供程序
提供程序可以通过在nuget种搜索Orleans.Streaming,也可以通过PersistentStreamProvider 与 IQueueAdapter 重写来自定义Provider
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
相关推荐: glibc 2.23 源码分析 | malloc & free
1. 基础知识 1.1 Linux 进程内存布局 Linux 系统在装载 elf 格式的程序文件时,会调用 loader 把可执行文件中的各个段依次载入到从某一地址开始的空间中(载入地址取决于 link editor(ld) 和机器地址位数,在 32 位机器上…