+++ title = 'K8s Client Sdk Patterns I Use' date = 2024-12-15T11:57:05Z +++
K8s Client SDK Patterns I Use
Every now and then I need to write a program that runs as a daemon process and interacts with k8s cluster to perform pods such as:
- Operating as a HTTP facade to the k8s resources for other services outside of the k8s cluster.
- Running as a daemonset to automate some of the node level domain specific pods (e.g. prepulling images).
Given the nature of the pods these deamons often comes with aggressive queries to the k8s API server, such as trying to poll status or CRUD operations on resources.
This comes with a few challenges:
- You don't want your k8s client to bombard the K8s API server.
- You probably also don't want your k8s client to hit the priority and fairness limit imposed by the API server.
By default if you use the kubebuilder or the underlying controller-runtime, you will get these features out of the box, however the daemon process does not necessarily fit into the "reconcile loop" pattern. That being said I'd like to reuse controller-runtime's k8s client sdk for the daemon process instead of reinventing the wheel and solving all the edge cases.
In this post I will share some of the patterns I use to make the k8s client sdk work for a custom daemon process.
Before we start
You should not use this pattern when:
- You are just writing a simple CLI tool that interacts with k8s cluster that fire and forget. Since it heavily relies on the client side cache and the lister pattern, you will end up with cache-sync on every command run which is not efficient.
- Your program is designed to deployed en-masse to a k8s cluster. This is because it opens a lot of long running connections to the k8s API server, which generates a lot of load.
How it works
The diagram below shows how the client works:
flowchart LR
subgraph ControllerRuntime["Controller Runtime"]
Client[Client]
Cache[(Cache Store)]
Informer[Informer Factory]
end
subgraph ClientGo["Client-go"]
RestClient[REST Client]
ListerWatcher[Lister/Watcher]
Watch[Watch Interface]
end
API[K8s API Server]
%% Controller Runtime connections
Client -->|Delegate| RestClient
Client -->|Read| Cache
Informer -->|Create| ListerWatcher
%% Client-go connections
RestClient -->|Write| API
ListerWatcher -->|List/Watch| API
%% Watch events flow
API -->|Events Stream| Watch
Watch -->|Update| Cache
classDef subgraphStyle fill:#f5f5f5,stroke:#333,stroke-width:2px
class ControllerRuntime,ClientGo subgraphStyle
The boilerplate code
The code itself somewhat looks like this:
import (
"context"
"errors"
"fmt"
"log"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
goscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type Service struct {
client client.Client
}
func NewService(ctx context.Context) (*Service, error) {
scheme := runtime.NewScheme()
if err := goscheme.AddToScheme(scheme); err != nil {
return nil, err
}
// ask the client to add the scheme for the resources you need
// corev1 is being added here, you can also add custom resource definition as well if
// you have the scheme defined in Golang.
if err := corev1.AddToScheme(scheme); err != nil {
return nil, err
}
// load the kubeconfig from varies sources such as:
// 1. kubeconfig file
// 2. in-cluster config
// 3. service account token
// This code probably saves you a few hours of time ;)
config := ctrl.GetConfigOrDie()
// create a cache for the client to use
ca, err := cache.New(config, cache.Options{Scheme: scheme})
if err != nil {
return nil, err
}
// start the cache to sync the resources
go func() {
if err := ca.Start(ctx); err != nil {
log.Println("cache error", err)
}
}()
// wait for the cache to sync the resources
if waited := ca.WaitForCacheSync(ctx); !waited {
return nil, errors.New("cache sync failed")
}
// create a new client with the cache
c, err := client.New(config, client.Options{
Cache: &client.CacheOptions{
Reader: ca,
},
})
if err != nil {
return nil, err
}
return &Service{client: c}, nil
}
What happens here is that:
- On startup, we create a new client with the cache. The client starts a long running HTTP2 connection with the k8s API server.
- Through the long running HTTP2 connection, the client syncs all the resources to the cache, and on resource change, the client will be notified of the change from the API server and update the cache accordingly.
- The client will then use the cache to read the resources, which shed a lot of load from the API server vs making a direct HTTP request to the API server.
- For write operations, the client will make a direct HTTP request to the API server, and the API server will send the request to the node where the daemon process is running.
- At the event of k8s server being unavailable, the client is intelligent enough to wait for the server to be ready and then re-sync the resources from the API server again, which I find very neat.
Historically the operation is initialised via client.NewDelegatingClient
but it was removed from the controller runtime through this PR.
How to use the client
The following example shows how to use the client as a http facade.
func (s *Service) GetPod(g *gin.Context) {
var (
namespace = g.Param("namespace")
name = g.Param("name")
ctx = g.Request.Context()
reqLog = logger.G(ctx)
pod = &corev1.Pod{}
)
if err := s.client.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, pod); err != nil {
reqLog.WithError(err).Error("failed to get pod")
if apierrors.IsNotFound(err) {
g.JSON(http.StatusNotFound, "not found")
} else {
g.JSON(http.StatusInternalServerError, "internal server error")
}
}
g.JSON(http.StatusOK, pod)
}
Conventionally when you run s.client.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, pod)
you'd exect the client to make a direct HTTP request to the k8s API server, however in this case the client will use the cache to read the resource from the cache instead.
To create a pod you can
func (s *Service) CreatePod(g *gin.Context) {
var (
namespace = g.Param("namespace")
pod = corev1.Pod{}
ctx = g.Request.Context()
reqLog = logger.G(ctx)
)
if err := g.ShouldBindJSON(&pod); err != nil {
reqLog.WithError(err).Error("failed to bind pod")
g.JSON(http.StatusBadRequest, err.Error())
return
}
pod.Namespace = namespace
if err := s.client.Create(ctx, &pod); err != nil {
reqLog.WithError(err).Error("failed to create pod")
g.JSON(http.StatusBadRequest, err.Error())
return
}
g.JSON(http.StatusCreated, pod)
}
In the case above at the event of s.client.Create(ctx, &pod)
the client will make a direct HTTP request to the k8s API server.
As far as I know the write operation doesn't come with any rate limiting protection. From the controller-runtime perspective it comes with it's own worker-queue rate limiter but it is for reconcilation loop only. That being said it's pretty trivial to implement a rate limiter for the write operation.
How to test it
As you can see the client itself is fairly easy to use, however due to the nature of its complexity test it is not very straightforward to raw-dog in plain Golang.
Fortunately the controller-runtime also provides a easy way to fake the client. It looks like this:
package main
import (
"context"
"testing"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
func TestServiceGetPod(t *testing.T) {
scheme := runtime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil {
t.Fatalf("failed to add corev1 scheme: %v", err)
}
clientBuilder := fake.NewClientBuilder().WithScheme(scheme).WithObjects(
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-container",
Image: "test-image",
},
},
},
},
)
client := clientBuilder.Build()
if client == nil {
t.Fatalf("failed to build client")
}
var pod corev1.Pod
err := client.Get(context.Background(), types.NamespacedName{Name: "test-pod", Namespace: "default"}, &pod)
if err != nil {
t.Fatalf("failed to get pod: %v", err)
}
if pod.Name != "test-pod" {
t.Fatalf("expected pod name to be test-pod, got %s", pod.Name)
}
}
Conclusion
In this post we've covered:
- The mechanism of how the controller-runtime's k8s cached client works.
- How to use the client for a daemon process.
- How to test the client.
I hope you find this post useful ;).