1、简说Akka
Flink 内部节点之间的通信是用 Akka,比如 JobManager 和 TaskManager 之间的通信。 而 operator 之间的数据传输是利用 Netty,所以是不是有必要说一下Akka ?
Akka和Actor
并发问题的核心就是存在数据共享,同时有多个线程对一份数据进行修改,就会出现错乱的情况
解决该问题一般有两种方式 :
1、基于JVM内存模型的设计,通常需要通过加锁等同步机制保证共享数据的一致性。但是加锁在高并发的场景下,往往性能不是很好
2、使用消息传递的方式
Actor的基础就是消息传递,一个Actor可以认为是一个基本的计算单元,它能接收消息并基于其执行运算,它也可以发送消息给其他Actor。Actors 之间相互隔离,它们之间并不共享内存
Actor 本身封装了状态和行为,在进行并发编程时,Actor只需要关注消息和它本身。而消息是一个不可变对象,所以 Actor 不需要去关注锁和内存原子性等一系列多线程常见的问题。
所以Actor是由状态(State)、行为(Behavior)和邮箱(MailBox,可以认为是一个消息队列)三部分组成
状态:Actor 中的状态指Actor对象的变量信息,状态由Actor自己管理,避免了并发环境下的锁和内存原子性等问题
行为:Actor 中的计算逻辑,通过Actor接收到的消息来改变Actor的状态
邮箱:邮箱是 Actor 和 Actor 之间的通信桥梁,邮箱内部通过 FIFO(先入先出)消息队列来存储发送方Actor消息,接受方Actor从邮箱队列中获取消息
任意一个Actor即可发送消息,也可以接受消息
Akka是一个构建在JVM上,基于Actor模型的的并发框架,支持Java和Scala两种API
2、Akka详解
使用 Akka 可以让你从为 Actor 系统创建基础设施和编写控制基本行为所需的初级代码中解脱出来。为了理解这一点,让我们看看在代码中创建的Actor与Akka在内部创建和管理的Actor之间的关系,Actor的生命周期和失败处理
Akka的Actor层级
- Akka的Actor总是属于父Actor。通常,你可以通过调用 getContext().actorOf() 来创建 Actor。与创建一个“独立的”Actor不同,这会将新Actor作为一个子节点注入到已经存在的树中,创建Actor的Actor成为新创建的子Actor的父级。你可能会问,你创造的第一个Actor的父节点是谁?
- 如下图所示,所有的 Actor 都有一个共同的父节点,即用户守护者。可以使用 system.actorOf() 在当前Actor下创建新的Actor实例。创建 Actor 将返回一个有效的 URL 引用。例如,如果我们用 system.actorOf(…, “someActor”) 创建一个名为 someActor 的 Actor,它的引用将包括路径 /user/someActor
-
事实上,在你在代码中创建 Actor 之前,Akka 已经在系统中创建了三个 Actor 。这些内置的 Actor 的名字包含 guardian ,因为他们守护他们所在路径下的每一个子 Actor。守护者 Actor 包括 :
- / ,根守护者( root guardian )。这是系统中所有Actor的父Actor,也是系统本身终止时要停止的最后一个 Actor
- /user ,守护者( guardian )。这是用户创建的所有Actor的父 Actor。不要让用户名混淆,它与最终用户和用户处理无关。使用Akka库创建的每个Actor都将有一个事先准备的固定路径/user/
- /system ,系统守护者( system guardian )。这是除上述三个Actor外,系统创建的所有Actor的父Actor
示例:
public class HierarchyActorTest {
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create("testSystem");
ActorRef firstActor = system.actorOf(MyPrintActor.props(), "firstActor");
System.out.println("firstActor : " + firstActor);
firstActor.tell("print_info", ActorRef.noSender());
System.out.println(">>> Press ENTER to exit {
ActorRef secondActorRef = getContext().actorOf(Props.empty());
System.out.println("secondActorRef : " + secondActorRef);
}).build();
}
}
}
输出结果 :
firstActor : Actor[akka://testSystem/user/firstActor#-1802697549]
>>> Press ENTER to exit
- 两条路径都以akka://testSystem/开头。因为所有 Actor的引用都是有效的URL, akka://是协议字段的值
- ActorSystem名为testSystem ,但它可以是任何其他名称。如果启用了多个系统之间的远程通 信,则URL的这一部分包括主机名和端口,以便其他系统可以在网络上找到它,下面会有案例
- 因为第二个 Actor的引用包含路径 /firstActor/ ,这个标识它为第一个Actor的子Actor
- Actor引用的最后一部分,即#-1802697549和#-1282757800是唯一标识符
Actor的生命周期
- 既然了解了Actor层次结构的样子,你可能会想 : 为什么我们需要这个层次结构?它是用来干什么的?
- 层次结构的一个重要作用是安全地管理Actor的生命周期。接下来,我们来考虑一下,这些知识如何帮助我们编写更好的代码
- Actor在被创建时就会出现,然后在用户请求时被停止。每当一个Actor被停止时,它的所有子 Actor也会被递归地停止。这种行为大大简化了资源清理,并有助于避免诸如由打开的套接字和文件引起的资源泄漏
- 要停止Actor,建议的模式是调用Actor内部的 getContext().stop(getSelf()) 来停止自身,通常是对某些用户定义的停止消息的响应,或者当Actor完成其任务时
-
Akka Actor的API暴露了许多生命周期的钩子,你可以在 Actor 的实现中覆盖这些钩子。最常用的是 preStart() 和 postStop() 方法
- preStart() 在 Actor 启动之后但在处理其第一条消息之前调用
- postStop() 在 Actor 停止之前调用,在此时之后将不再处理任何消息
示例:
public class Actor1 extends AbstractActor {
static Props props() {
return Props.create(Actor1.class, Actor1:: new);
}
@Override
public void preStart() throws Exception {
System.out.println("first actor started");
getContext().actorOf(Actor2.props(), "second");
}
@Override
public void postStop() throws Exception {
System.out.println("first actor stopped");
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("stop", s -> {
getContext().stop(getSelf());
}).build();
}
}
public class Actor2 extends AbstractActor {
static Props props() {
return Props.create(Actor2.class, Actor2::new);
}
@Override
public void preStart() throws Exception {
System.out.println("second actor started");
}
@Override
public void postStop() throws Exception {
System.out.println("second actor stopped");
}
@Override
public Receive createReceive() {
return receiveBuilder().build();
}
}
public class LifeCycleMain {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("testSystem");
ActorRef first = system.actorOf(Actor1.props(), "first");
first.tell("stop", ActorRef.noSender());
}
}
待更新。。。,写文章不易,如感兴趣点赞关注,谢谢!
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net