Kubernetes的Controller-manager集成了Kubernetes内置所有资源对象的控制器,而创建的CRD编写一个控制器的过程就是实现一个Operator。
Operator是CoreOS开发的、特定的应用程序控制器,用来扩展Kubernetes API。它可以创建、配置和管理复杂的有状态应用,如数据库、缓存和监控系统。Operator基于Kubernetes的资源和控制器之上构建,同时又包含了应用程序特定的领域知识。创建Operator的关键是CRD的设计和控制器的编写。
Operator的概念可以这样描述:设计一个CRD,并且为这个CRD编写控制器的过程。
虽然Kubernetes的控制器并不是万能的,但Kubernetes设计的高扩展特性Operator让自己去解决问题。
1.准备工作
在集群中创建CRD之后,开始编写控制器,Clientset只支持Kubernetes内部的资源对象操作,Informer也是如此,这时需要自己编写的CRD生成client-go客户端代码,金丝雀控制器项目的目录结构如下所示。
·manifests目录存放CRD的定义YAML文件。
·pkg有4个目录
■apis子目录下是我们需要准备的代码,待使用代码生成工具生成客户端代码。
■controllers子目录存放控制器的核心代码。
■generated和signals子目录下是使用https://github.com/kubernetes/code-generator代码生成工具生成的客户端代码。
·main.go是控制器入口。
·go.mod是依赖包。
先来看apis目录下的代码,代码路径为/pkg/apis/canarycontroller/register.go,示例如下。在GroupName字段填写CRD的分组名,代码路径为/pkg/apis/canarycontroller/v1alpha1/doc.go。
package canarycontroller
const (
GroupName = "canarycontroller.tech.com"
)
填写CRD分组名时重点注意types文件,定义的CRD具体字段都在types文件中,代码路径为/pkg/apis/canarycontroller/types.go,示例如下。
//+k8s:deepcopy-gen=package
//+groupName=canarycontroller.tech.com
package v1alpha1 //import "canary-controller/pkg/apis/canarycontroller/v1alpha1"
types文件代码如下。
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
//+genclient
//+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type Canary struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec CanarySpec `json:"spec"`
Status CanaryStatus `json:"status"`
}
type CanarySpec struct {
Info map[string]string `json:"info"`
}
type CanaryStatus struct {
Info map[string]string `json:"info"`
}
//+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type CanaryList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []Canary `json:"items"`
}
·在types中,定义了Canary这个资源对象的实体结构,和Kubernetes内置资源一样,拥有metadata、spec、status字段。
·在Spec下定义了Info字段,用来记录金丝雀发布策略,这个字段是一个map[string]string类型,可以通过添加和减少key的方式,在开发过程中添加和减少字段,无须重新用代码生成器生成客户端代码。
·在Status下定义了Info字段,用来记录金丝雀发布过程中的发布状态,这个字段同样是一个map[string]string类型。
·Items是Canary这个资源对象的列表结构体。
register.go注册代码的路径为/pkg/apis/canarycontroller/register.go,示例如下。
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
canarycontroller "canary-controller/pkg/apis/canarycontroller"
)
var SchemeGroupVersion = schema.GroupVersion{Group: canarycontroller.GroupName,
Version: "v1alpha1"}
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Canary{},
&CanaryList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
这里在addKnowTypes()方法中添加在types.go中定义类型的指针即可。
3.生成CRD的client-go客户端代码
在Linux系统下下载代码生成工具code-generator,示例如下。
git clone https://github.com/kubernetes/code-generator
将项目canary-controller的目录以及目录apis上传到Linux服务器。上传后,Linux下目录结构示例如下。
[root@golang src]# tree
└── canary-controller
└── pkg
└── apis
└── canarycontroller
├── register.go
└── v1alpha1
├── doc.go
├── register.go
└── types.go
下载依赖,进入code-generator目录,运行generate-groups.sh脚本生成客户端代码如下。
cd $GOPATH/src
&& go get -u k8s.io/apimachinery/pkg/apis/meta/v1
&& cd $GOPATH/src/k8s.io/code-generator
&& ./generate-groups.sh all
canary-controller/pkg/client
canary-controller/pkg/apis
canarycontroller:v1alpha1
执行成功后,新生成的代码如下。
文件:/pkg/apis/canarycontroller/v1alpha1/doc.go/zz_generated.deepcopy.go。
目录:/pkg/client。将目录/pkg/client改名为/pkg/generated即可。
4.编写金丝雀控制器的主逻辑
控制器的主要流程和逻辑比较复杂,我们先把每个方法的作用介绍一遍,再梳理整体的函数和方法执行流程。
首先是引入pkg,示例如下。
package controllers
import (
"context"
"encoding/json"
"fmt"
"math"
"strconv"
"strings"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsinformers "k8s.io/client-go/informers/apps/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
canaryv1alpha1 "canary-controller/pkg/apis/canarycontroller/v1alpha1"
clientset "canary-controller/pkg/generated/clientset/versioned"
canaryscheme "canary-controller/pkg/generated/clientset/versioned/scheme"
informers "canary-controller/pkg/generated/informers/externalversions/canarycontroller/v1alpha1"
listers "canary-controller/pkg/generated/listers/canarycontroller/v1alpha1"
)
定义eventBroadcaster常量和controller结构体,其中eventBroadcaster就是我们通过kubectl describe资源对象获取的事件,示例如下。
const controllerAgentName = "canary-controller"
const (
//注册SuccessSynced为reason事件之一,当canary同步成功时,回调显示
SuccessSynced = "Synced"
//注册ErrResourceExists为reason事件之一,由于 Deployment已经存在,导致
//Canary执行sync失败,回调显示
ErrResourceExists = "ErrResourceExists"资源不属于canary-controller管理的消息
//MessageResourceExists = "Resource %q already exists and is not
//managed by Canary"
//注册MessageResourceSynced为事件消息,当Canary执行sync成功时,回调显示
MessageResourceSynced = "Canary synced successfully"
)
//Canary控制器结构
type Controller struct {
//常规Kubernetes的Clientset
kubeclientset kubernetes.Interface
//Canary的Clientset
canaryclientset clientset.Interface
deploymentsLister appslisters.DeploymentLister
deploymentsSynced cache.InformerSynced
canariesLister listers.CanaryLister
canariesSynced cache.InformerSynced
//workqueue是一个限速工作队列,确保并发下同一时间一个项只被一个工作协程处理,有重新
//进入队列并降速的功能
workqueue workqueue.RateLimitingInterface
//事件记录者,记录资源对象的事件
recorder record.EventRecorder
}
NewController函数代码示例如下。
//生成 canary-controller对象
func NewController(
kubeclientset kubernetes.Interface,
canaryclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer,
canaryInformer informers.CanaryInformer) *Controller {
//创建事件广播,添加 canary-controller的类型到默认的Kubernetes scheme
//这样事件日志可以打印 canary-controller的类型
utilruntime.Must(canaryscheme.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl
{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.
EventSource{Component: controllerAgentName})
controller := &Controller{
kubeclientset: kubeclientset,
canaryclientset: canaryclientset,
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced,
canariesLister: canaryInformer.Lister(),
canariesSynced: canaryInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workque
ue.DefaultControllerRateLimiter(), "Canarys"),
recorder: recorder,
}
klog.Info("Setting up event handlers")
//设置Canary变更的事件回调方法
canaryInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueCanary,
UpdateFunc: func(old, new interface{}) {
controller.enqueueCanary(new)
},
})
//设置Deployment变更的事件处理方法,可以设置owner,确保只关心由Canary所管理的Deployment
//送入Canary的工作队列,我们这里不做过滤,详情查看:
//https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5
//bb3f104febe7e29830/contributors/devel/controllers.md
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
//事件会按resync设置的时间周期发送最新的Deployment状态
//如果resourceVersion相同,说明没有变更
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
return controller
}
NewController主要定义如下。
·生成Canary和Deployment的Clientset。
·订阅Canary的创建和更新事件,如果事件发生,将发生事件的Canary对象使用controller.enqueueCanary方法存入workqueue中。
·订阅Deployment的增删改事件,如果事件发生,将发生事件的Deployment对象交给handleObject方法处理。
Run()方法启动Controller的代码如下。
//可以设置关心的类型事件, informer caches会同步,并且启动 runWorker方法处理,stopCh关
//闭才会退出
//关闭时会停止 workerqueue,并且等待worker将任务处理完成
func (c *Controller) Run(threadiness int, stopCh
·Run()方法会启动threadiness个协程并发的执行runWorker()方法。
·runWorker()方法实际就是循环调用c.processNextWorkItem持续在workqueue中读取消息。
下面来看processNextWorkItem的具体逻辑,示例如下。
//从workqueue中读取单个项,并且通过调用syncHandler尝试处理
func (c *Controller) processNextWorkItem() bool {
startTimeNano := time.Now().UnixNano()
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
//这里为了调用defer c.workqueue.Done封装一个函数
err := func(obj interface{}) error {
//workqueue.Done通知这个项已经处理完成,如果不想让这个项重新进入队列,一
//定要记得调用 workqueue.Forget,否则这个项会重新进入队列并且在下一个
//back-off的周期继续进入队列
defer c.workqueue.Done(obj)
var key string
var ok bool
//这里取出obj对象的string,string是namespace/name,这样可以通过
//informer的cache取出更多的信息
if key, ok = obj.(string); !ok {
//丢弃无效的项,否则会循环尝试处理无效的项
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in
workqueue but got %#v", obj))
return nil
}
//调用syncHandler,传递Canary的namespace/name
if err := c.syncHandler(key); err != nil {
//如果处理错误,重新放回队列并限速
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing",
key, err.Error())
}
//处理成功,则调用Forget方法从队列中移除项
c.workqueue.Forget(obj)
endTimeNano := time.Now().UnixNano()
costTimeNano := (endTimeNano - startTimeNano) / 1e6
klog.Infof("Successfully synced '%s'ncostTime:%vnthe workQueue
len:%dn", key, costTimeNano, c.workqueue.Len())
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
processNextWorkItem主要做了如下事情。
·从workqueue中取出obj对象。
·从取出的obj对象中取出资源名,调用syncHandler进行状态收敛逻辑。
·处理成功,调用Forget()方法将obj对象从workqueue中移除。
·处理失败,将obj对象放回workqueue并限速。
下面我们看syncHandler的代码定义,示例如下。
//对比实际状态和期望状态,尝试收敛到期望状态并同步更新Canary的实际状态
func (c *Controller) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
//通过namespace/name获取Canary
canary, err := c.canariesLister.Canaries(namespace).Get(name)
if err != nil {
//如果Canary不存在,停止处理并返回
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("canary '%s' in
work queue no longer exists", key))
return nil
}
return err
}
newDeployment, err := c.unmarshalDeployment(canary, "newDeploymentYaml")
if err != nil {
return err
}
klog.Infof("Synchandle deployment is %s/%sn", newDeployment.Namespace,
newDeployment.Name)
if newDeployment.Name == "" {
//消化本次错误,不再进入队列
utilruntime.HandleError(fmt.Errorf("%s: deployment name must
be specified", key))
return nil
}
var deployment *appsv1.Deployment
canaryType := canary.Spec.Info["type"]
switch canaryType {
case "NormalDeploy":
klog.Info("NormalDeploy")
deployment, err = c.normalDeploy(canary, newDeployment)
if err != nil {
return err
}
case "CanaryDeploy":
if canary.Spec.Info["currentBatch"] == "1" {
klog.Info("canaryDeploy")
deployment, err = c.firstCanaryDeploy(canary, newDeployment)
if err != nil {
return err
}
} else {
deployment, err = c.notFirstCanaryDeploy(canary,
newDeployment)
if err != nil {
return err
}
}
case "CanaryRollback":
klog.Info("RollBack")
deployment, err = c.canaryRollback(canary, newDeployment)
if err != nil {
return err
}
default:
klog.Info("canaryType not match!")
return nil
}
err = c.updateCanaryStatus(canary, deployment)
if err != nil {
return err
}
c.recorder.Event(canary, corev1.EventTypeNormal, SuccessSynced,
MessageResourceSynced)
return nil
}
这是金丝雀的核心控制逻辑,跟业务息息相关。Operator基于Kubernetes的资源和控制器概念之上进行构建,同时又包含了应用程序特定的领域知识。
我们开发的金丝雀控制器包含两种发布方式。
·NormalDeploy:原生的Deployment滚动升级。
·CanaryDeployment:金丝雀发布,将应用的Pod实例分成特定的批次,批次不大于总实例数,第一批次一定会暂停,剩下批次的用户可以决定是手动点击发布,还是控制器代替用户自动发布。
enqueueCanary()是Canary对象发生创建和更新事件后调用的方法,将对象通过Meta-NamespaceKeyFunc取出资源名并放入workqueue,示例如下。
func (c *Controller) enqueueCanary(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
handlerObject()是Deployment事件触发后的处理方法,示例如下。
func (c *Controller) handleObject(obj interface{}) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding
object, invalid type"))
return
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding
object tombstone, invalid type"))
return
}
klog.Infof("Recovered deleted object '%s' from tombstone",
object.GetName())
}
klog.Infof("Processing object: %sn", object.GetName())
if strings.HasPrefix(object.GetName(), "canary-") {
lastIndex := strings.LastIndex(object.GetName(), "-deployment-")
canaryName := object.GetName()[7:lastIndex] + "-canary"
canary, err := c.canariesLister.Canaries(object.GetNamespace()).
Get(canaryName)
if err != nil {
klog.Infof("Canary get error Ignoreing the object: %s",
object.GetName())
klog.Infof("canaryName is %s, error:%s", canaryName, err)
return
}
c.enqueueCanary(canary)
} else {
klog.Infof("Not prefix canary-, ignore %s/%sn", object.
GetNamespace(), object.GetName())
}
//if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
// if ownerRef.Kind != "Canary" {
// return
// }
//
// canary, err := c.canariesLister.Canaries(object.GetNamespace()).
Get(ownerRef.Name)
// if err != nil {
// klog.Infof("ignoring orphaned object '%s' of foo
'%s'", object.GetSelfLink(), ownerRef.Name)
// return
// }
//
// c.enqueueCanary(canary)
// return
//}
}
handlerObject()方法的主要流程如下。
1)获取资源对象的名称,验证是否是canary开头,这是我们通过Canary生成的Deployment的名字前缀。其实这不是一个很好的鉴别方式,正确的方法是使用注释中的ownerRef来判断这个Deployment是否由Canary管控。这是一个历史原因,在上线金丝雀资源之前,很多Deployment并不是通过Canary创建的。当然,所有Deployment都发布过一次后,就改为使用ownerRef来鉴别资源是否由Canary控制器管控。
2)验证通过后,调用enqueueCanary()方法存入workqueue。
下面3个方法是金丝雀YAML文件中type字段的处理逻辑,首先是原生的滚动升级,示例如下。
func (c *Controller) normalDeploy(canary *canaryv1alpha1.Canary, newDeployment
*appsv1.Deployment) (*appsv1.Deployment, error) {
deployment, err := c.deploymentsLister.Deployments(canary.Namespace).
Get(newDeployment.Name)
//如果不存在,则创建
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
Namespace).Create(newDeploymentWithOwner(canary, newDeployment))
klog.Infof("initDeploy return deployment: %sn", deployment)
return deployment, err
}
//如果Get/Create()方法出错,比如发生网络错误或其他错误,那么返回错误,将obj对象
//重新放入队列,稍后再处理
if err != nil {
return deployment, err
}
if deployment.Spec.Template.Spec.Containers[0].Image != newDeployment.
Spec.Template.Spec.Containers[0].Image ||
*deployment.Spec.Replicas != *newDeployment.Spec.Replicas ||
deployment.Spec.Template.Labels["random"] != newDeployment.
Spec.Template.Labels["random"] ||
deployment.Spec.Template.Spec.Containers[0].Resources.Limits.
Cpu().String() != newDeployment.Spec.Template.Spec.Containers[0].
Resources.Limits.Cpu().String() ||
deployment.Spec.Template.Spec.Containers[0].Resources.Limits.
Memory().String() != newDeployment.Spec.Template.Spec.
Containers[0].Resources.Limits.Memory().String() {
klog.Infof("image:%s, %s, replicas:%d, %dn", newDeployment.
Spec.Template.Spec.Containers[0].Image, newDeployment.
Spec.Template.Spec.Containers[0].Image,
deployment.Spec.Replicas, newDeployment.Spec.Replicas)
deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
Namespace).Update(newDeploymentWithOwner(canary, newDeployment))
klog.Info("image or replicas not equal update!")
}
//如果 Get/Create方法出错,比如网络错误或其他错误,那么返回错误,将obj对象重新放
//入队列,稍后再处理
if err != nil {
return deployment, err
}
return deployment, nil
}
normalDeploy()方法的主要流程如下。
1)判断集群中的Deployment是否存在,如果不存在则创建。
2)如果存在,查看当前Deployment的实例、镜像、CPU、内存配置是否和Canary中定义的相符,如果不符合,则按照Canary中定义的状态收敛并更新。
3)中间如果处理失败则返回错误,由上一层方法限速并放回workqueue。
firstCanaryDeploy()方法的定义如下。
func (c *Controller) firstCanaryDeploy(canary *canaryv1alpha1.Canary, newDeployment
*appsv1.Deployment) (*appsv1.Deployment, error) {
oldDeployment, err := c.unmarshalDeployment(canary, "oldDeploymentYaml")
if err != nil {
return oldDeployment, err
}
newDeploymentDesiredReplicas := *newDeployment.Spec.Replicas
*newDeployment.Spec.Replicas = 1
deployment, err := c.deploymentsLister.Deployments(canary.Namespace).
Get(newDeployment.Name)
//如果不存在,则创建
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
Namespace).Create(newDeploymentWithOwner(canary, newDeployment))
klog.Infof("canaryDeploy return deployment: %sn", deployment)
return deployment, err
}
if err != nil {
return deployment, err
}
if *deployment.Spec.Replicas != 1 || deployment.Spec.Template.Spec.Containers[0].
Image != newDeployment.Spec.Template.Spec.Containers[0].Image {
deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
Namespace).Update(newDeploymentWithOwner(canary, newDeployment))
klog.Info("firstCanaryDeploy: image or replicas not equal so update!")
if err != nil {
return deployment, err
}
}
if deployment.Status.AvailableReplicas == 1 {
*oldDeployment.Spec.Replicas = newDeploymentDesiredReplicas - 1
oldDeploymentJson, _ := json.Marshal(oldDeployment)
oldDeployment, err = c.kubeclientset.AppsV1().Deployments
(oldDeployment.Namespace).Patch(oldDeployment.Name, types.
MergePatchType, oldDeploymentJson)
if err != nil {
return oldDeployment, err
}
}
return deployment, err
}
firstCanaryDeploy()是金丝雀第一次批次发版的执行方法,主要流程如下。
1)判断集群中的Deployment是否存在,如果不存在则创建。
2)如果存在,则判断当前运行中的新版本Deployment实例是否为1,如果不为1,则进行状态收敛。
3)查看旧版本Deployment的实例数,并收敛新旧版本实例数,旧版本可用实例数+老版本可用实例数=应用正常定义的总实例数。
4)实时更新status字段,上报第一次发布的状态。
notFirstCanaryDeploy()是金丝雀非首次发布批次的处理方法,示例如下。
//金丝雀非首次发布
func (c *Controller) notFirstCanaryDeploy(canary *canaryv1alpha1.Canary,
newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) {
oldDeployment, err := c.unmarshalDeployment(canary, "oldDeploymentYaml")
if err != nil {
return oldDeployment, err
}
newDeploymentDesiredReplicas := *newDeployment.Spec.Replicas
intCurrentBatch, err := strconv.Atoi(canary.Spec.Info["currentBatch"])
if err != nil {
return oldDeployment, err
}
intTotalBatches, err := strconv.Atoi(canary.Spec.Info["totalBatches"])
if err != nil {
return oldDeployment, err
}
currentBatch := int32(intCurrentBatch)
totalBatches := int32(intTotalBatches)
everyAddReplicas := int32(math.Floor(float64(newDeploymentDesiredRepli
cas)/float64(totalBatches) + 0.5))
//如果只有第一批暂停,那么增加currentCount后退出,等待下次处理
if canary.Spec.Info["pauseType"] == "First" && canary.Status.Info
["batch"+canary.Spec.Info["currentBatch"]+"Status"] == "Finished" &&
canary.Spec.Info["currentBatch"] != canary.Spec.
Info["totalBatches"] {
klog.Info("pauseType First add currentBatch")
canaryCopy := canary.DeepCopy()
canaryCopy.Spec.Info["currentBatch"] = strconv.Itoa
(int(currentBatch + 1))
_, err = c.canaryclientset.CanarycontrollerV1alpha1().Canaries
(canaryCopy.Namespace).Update(context.TODO(), canaryCopy,
metav1.UpdateOptions{})
if err != nil {
klog.Infof("Update canary failed: %s", err)
return oldDeployment, err
}
deployment, err := c.deploymentsLister.Deployments(canary.
Namespace).Get(newDeployment.Name)
return deployment, err
}
if totalBatches == currentBatch {
*newDeployment.Spec.Replicas = newDeploymentDesiredReplicas
} else {
*newDeployment.Spec.Replicas = 1 + (currentBatch-1)*everyAddReplicas
}
deployment, err := c.deploymentsLister.Deployments(canary.Namespace).
Get(newDeployment.Name)
if err != nil {
return deployment, err
}
if *deployment.Spec.Replicas != *newDeployment.Spec.Replicas ||
deployment.Spec.Template.Spec.Containers[0].Image !=
newDeployment.Spec.Template.Spec.Containers[0].Image ||
deployment.Spec.Template.Labels["random"] != newDeployment.
Spec.Template.Labels["random"] ||
deployment.Spec.Template.Spec.Containers[0].Resources.Limits.
Cpu().String() != newDeployment.Spec.Template.Spec.
Containers[0].Resources.Limits.Cpu().String() ||
deployment.Spec.Template.Spec.Containers[0].Resources.Limits.
Memory().String() != newDeployment.Spec.Template.Spec.
Containers[0].Resources.Limits.Memory().String() {
deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
Namespace).Update(newDeploymentWithOwner(canary, newDeployment))
klog.Info("notFirstCanaryDeployFirstPause: image or replicas
not equal so update!")
if err != nil {
return deployment, err
}
}
*oldDeployment.Spec.Replicas = newDeploymentDesiredReplicas - deployment.
Status.AvailableReplicas
if *oldDeployment.Spec.Replicas == 0 {
_, err := c.deploymentsLister.Deployments(oldDeployment.
Namespace).Get(oldDeployment.Name)
//如果已经删除,则不处理
if errors.IsNotFound(err) {
return deployment, nil
}
if err != nil {
return deployment, err
}
if err := c.kubeclientset.AppsV1().Deployments(oldDeployment.
Namespace).Delete(oldDeployment.Name, &metav1.DeleteOptions{});
err != nil {
return deployment, err
}
} else {
oldDeploymentJson, _ := json.Marshal(oldDeployment)
oldDeployment, err = c.kubeclientset.AppsV1().Deployments
(oldDeployment.Namespace).Patch(oldDeployment.Name, types.
MergePatchType, oldDeploymentJson)
if err != nil {
return deployment, err
}
}
return deployment, err
}
notFirstCanaryDeploy()方法的逻辑是整个金丝雀控制器中最复杂的。
1)获取当前集群中新版本和老版本的实例数。
2)获取当前集群中Canary定义的发布批次和暂停策略。
3)动态、实时计算集群中新旧版本Deployment可用的实例数,按照Canary定义的发布批次和策略,动态增加新版本的实例数,减少老版本的实例数。
4)实时更新status字段,上报发布的状态。
canaryRollback()是金丝雀发布的回滚方法,示例如下。
func (c *Controller) canaryRollback(canary *canaryv1alpha1.Canary,
newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) {
oldDeployment, err := c.unmarshalDeployment(canary, "oldDeploymentYaml")
if err != nil {
return oldDeployment, err
}
newDeploymentDesiredReplicas := *newDeployment.Spec.Replicas
deployment, err := c.deploymentsLister.Deployments(canary.Namespace).
Get(newDeployment.Name)
//如果不存在,则创建
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
Namespace).Create(newDeploymentWithOwner(canary, newDeployment))
klog.Infof("canaryDeploy return deployment: %sn", deployment)
return deployment, err
}
if err != nil {
return deployment, err
}
if *deployment.Spec.Replicas != *newDeployment.Spec.Replicas ||
deployment.Spec.Template.Spec.Containers[0].Image !=
newDeployment.Spec.Template.Spec.Containers[0].Image ||
deployment.Spec.Template.Labels["random"] != newDeployment.
Spec.Template.Labels["random"] ||
deployment.Spec.Template.Spec.Containers[0].Resources.Limits.
Cpu().String() != newDeployment.Spec.Template.Spec.
Containers[0].Resources.Limits.Cpu().String() ||
deployment.Spec.Template.Spec.Containers[0].Resources.Limits.
Memory().String() != newDeployment.Spec.Template.Spec.
Containers[0].Resources.Limits.Memory().String() {
deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
Namespace).Update(newDeploymentWithOwner(canary, newDeployment))
klog.Info("canaryRollback: image or replicas not equal so update!")
if err != nil {
return deployment, err
}
}
//如果是分批发布的回滚,则不处理
if oldDeployment.Name == newDeployment.Name {
klog.Infof("oldDeployment.Name %s == newDeployment.Name %s,
not delete oldDeployment", oldDeployment.Name, newDeployment.Name)
return deployment, nil
}
*oldDeployment.Spec.Replicas = newDeploymentDesiredReplicas -
deployment.Status.AvailableReplicas
if *oldDeployment.Spec.Replicas == 0 {
_, err := c.deploymentsLister.Deployments(oldDeployment.
Namespace).Get(oldDeployment.Name)
//如果已经删除,则不处理
if errors.IsNotFound(err) {
return deployment, nil
}
if err != nil {
return deployment, err
}
if err := c.kubeclientset.AppsV1().Deployments(oldDeployment.
Namespace).Delete(oldDeployment.Name, &metav1.
DeleteOptions{}); err != nil {
return deployment, err
}
} else {
oldDeploymentJson, _ := json.Marshal(oldDeployment)
oldDeployment, err = c.kubeclientset.AppsV1().Deployments
(oldDeployment.Namespace).Patch(oldDeployment.Name, types.
MergePatchType, oldDeploymentJson)
if err != nil {
return deployment, err
}
}
return deployment, err
}
canaryRollback()方法需要考虑在各种情况下应该如何回滚,主要逻辑如下。
如果已经发布完成,回滚时需要创建老版本的Deployment,并设置replicas字段为真实实例数,这时会发生如下2种情况。
·如果是正常的滚动升级,则不做处理。因为滚动升级并不需要生成2个Deployment,只需要将newDeploymentYaml字段的Deployment回滚为老版本即可。
·如果是金丝雀发布,那么需要全程计算老版本(要回滚的版本)可用实例数,动态减少新版本(不再需要的版本)的实例数。
实时更新status字段,报告回滚状态。
syncHandler()方法在结束之前,会执行updateCanaryStatus()方法,上报当前发布的状态,示例如下。
func (c *Controller) updateCanaryStatus(canary *canaryv1alpha1.Canary,
deployment *appsv1.Deployment) error {
//不要修改informer获取的资源,它是只读的本地缓存
//使用深拷贝
canaryCopy := canary.DeepCopy()
canaryCopy.Status.Info = make(map[string]string)
canaryType := canary.Spec.Info["type"]
switch canaryType {
case "CanaryDeploy":
if *deployment.Spec.Replicas == deployment.Status.AvailableReplicas {
if canary.Spec.Info["totalBatches"] != canary.Spec.
Info["currentBatch"] {
canaryCopy.Status.Info["batch"+canary.Spec.Info
["currentBatch"]+"Status"] = "Finished"
} else {
oldDeployment, err := c.unmarshalDeployment
(canary, "oldDeploymentYaml")
if err != nil {
return err
}
_, err = c.deploymentsLister.Deployments
(oldDeployment.Namespace).Get
(oldDeployment.Name)
//如果旧的Deployment已经删除,则更新状态为完成
if err != nil {
if errors.IsNotFound(err) {
canaryCopy.Status.Info["batch"+
canary.Spec.Info
["currentBatch"]+"Status"] =
"Finished"
} else {
return err
}
} else {
canaryCopy.Status.Info["batch"+canary.
Spec.Info["currentBatch"]+"Status"]
= "Ing"
}
}
} else {
canaryCopy.Status.Info["batch"+canary.Spec.Info
["currentBatch"]+"Status"] = "Ing"
}
case "CanaryRollback":
oldDeployment, err := c.unmarshalDeployment(canary,
"oldDeploymentYaml")
if err != nil {
return err
}
newDeployment, err := c.unmarshalDeployment(canary,
"newDeploymentYaml")
if err != nil {
return err
}
_, err = c.deploymentsLister.Deployments(oldDeployment.
Namespace).Get(oldDeployment.Name)
//如果旧的Deployment已经删除,则更新状态为完成
if err != nil {
if errors.IsNotFound(err) {
canaryCopy.Status.Info["rollbackStatus"] = "Finished"
} else {
return err
}
} else {
if oldDeployment.Name == newDeployment.Name {
canaryCopy.Status.Info["rollbackStatus"] = "Finished"
} else {
canaryCopy.Status.Info["rollbackStatus"] = "Ing"
}
}
case "NormalDeploy":
return nil
default:
klog.Info("canary.Spec.Info type not match!")
return nil
}
canaryCopy.Status.Info["availableReplicas"] = strconv.Itoa(int
(deployment.Status.AvailableReplicas))
return err
}
updateCanaryStatus()方法针对以下3种情况进行状态上报。
·如果是金丝雀发布,实时获取当前新老版本的实例数与可用实例数,并通过发布批次和策略判断计算是否发布完成,如果发布完成,将状态更新为完成。字段的key携带批次。
·如果是回滚发布,那么判断不需要的版本是否已经删除,需要回滚的版本是否已经恢复到应用定义的实例数。
·如果是滚动发布,则不进行任何处理。
unmarshalDeployment()和newDeploymentWithOwner()方法的定义如下。
//获取canary中的Deployment
func (c *Controller) unmarshalDeployment(canary *canaryv1alpha1.Canary,
newOrOld string) (*appsv1.Deployment, error) {
deployment := &appsv1.Deployment{}
deploymentInfo := []byte(canary.Spec.Info[newOrOld])
if err := json.Unmarshal(deploymentInfo, deployment); err != nil {
klog.Infof("%s/%s unmarshal failed: ", canary.Namespace,
canary.Name)
klog.Info(err)
return deployment, err
}
return deployment, nil
}
func newDeploymentWithOwner(canary *canaryv1alpha1.Canary, deployment *appsv1.
Deployment) *appsv1.Deployment {
deployment.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
*metav1.NewControllerRef(canary, canaryv1alpha1.SchemeGroupVersion.
WithKind("Canary")),
}
return deployment
unmarshalDeployment()方法负责将从Canary取出来的string转换为Deployment的结构体。
newDeploymentWithOwner()方法是给创建的Deployment对象添加Ow
nerReferences属性,代表这个Deployment归金丝雀控制器管控。
整个金丝雀控制器的代码较为复杂,需要我们熟悉使用场景,对发布过程中的实例数、可用实例数、发布的类型和策略进行计算后精准控制,这种高度自定义的需求也是Kubernetes中没有内置金丝雀发布的功能而是留给用户自己去研发的原因。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.e1idc.net