Skip to content

Commit

Permalink
Merge pull request #17 from pdettori/issue-16
Browse files Browse the repository at this point in the history
✨ add loki operator on OCP integration for shadow pods
  • Loading branch information
pdettori authored Jul 9, 2024
2 parents 47f2184 + fb25e23 commit a460780
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 48 deletions.
2 changes: 2 additions & 0 deletions charts/shadow-pods/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ spec:
- --metrics-bind-address=127.0.0.1:8080
- --leader-elect
- --shadow-pod-image={{ .Values.lokiLoggerImage.repository }}:{{ .Values.lokiLoggerImage.tag }}
- --loki-install-type={{ .Values.lokiInstallType }}
- --certs-secret={{ .Values.certsSecretName }}
ports:
- name: https
containerPort: 9443
Expand Down
6 changes: 6 additions & 0 deletions charts/shadow-pods/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ lokiLoggerImage:
# Overrides the image tag whose default is the chart appVersion.
tag: ""

# lokiInstallType: dev (for upstream kube install) or openshift (with loki operator)
lokiInstallType: "dev"

# name of secret with certificates for connecting to loki using TLS (it can be copied from the "openshift-logging" namespace)
certsSecretName: "logging-loki-querier-http"

imagePullSecrets: []
nameOverride: ""
fullnameOverride: ""
Expand Down
100 changes: 63 additions & 37 deletions shadow-pods/cmd/loki-logger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
Expand All @@ -27,7 +28,8 @@ import (
"sort"
"strconv"
"time"
"crypto/tls"

ctr "kubestellar/galaxy/shadow-pods/internal/controller"
)

const (
Expand Down Expand Up @@ -79,47 +81,52 @@ type Query struct {
TlsKeyFile string
}

// LogEntry represents the structure of JSON data in the OpenShift case.
type LogEntry struct {
Message string `json:"message"`
}

func (q *Query) Run(start string) (string, error) {
params := url.Values{}
params.Set("start", start)

if q.LokiInstallType == "openshift" {
if q.LokiInstallType == ctr.LokiInstallTypeOpenShift {
params.Set("query", fmt.Sprintf(`{kubernetes_pod_name="%s", kubernetes_namespace_name="%s", log_type="%s"} | json | hostname="%s" | kubernetes_container_name="%s"`,
q.Pod, q.Namespace, q.LogType, q.NodeName, q.Container))
q.Pod, q.Namespace, q.LogType, q.NodeName, q.Container))
} else {
params.Set("query", fmt.Sprintf(`{pod="%s",namespace="%s",container="%s",node_name="%s"}`,
q.Pod, q.Namespace, q.Container, q.NodeName))
q.Pod, q.Namespace, q.Container, q.NodeName))
}
params.Set("limit", limit)

queryUrl := fmt.Sprintf("%s/loki/api/v1/query_range?%s", q.URL, params.Encode())

req, err := http.NewRequest(http.MethodGet, queryUrl, nil)
if err != nil {
return "", fmt.Errorf("client: could not create request: %v", err)
}
if err != nil {
return "", fmt.Errorf("client: could not create request: %v", err)
}
client := &http.Client{}

if q.LokiInstallType == "openshift" {
req.Header.Set("X-Scope-OrgID", q.LogType)
if q.LokiInstallType == ctr.LokiInstallTypeOpenShift {
req.Header.Set("X-Scope-OrgID", q.LogType)
clientTLSCert, err := tls.LoadX509KeyPair(q.TlsCertFile, q.TlsKeyFile)
if err != nil {
return "", fmt.Errorf("Error loading certificate and key file: %v", err)
}
if err != nil {
return "", fmt.Errorf("error loading certificate and key file: %v", err)
}

tlsConfig := &tls.Config{
InsecureSkipVerify: true,
Certificates: []tls.Certificate{clientTLSCert},
}
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
Certificates: []tls.Certificate{clientTLSCert},
}

tr := &http.Transport{
TLSClientConfig: tlsConfig,
}
tr := &http.Transport{
TLSClientConfig: tlsConfig,
}

client = &http.Client{Transport: tr}
client = &http.Client{Transport: tr}
}

resp, err := client.Do(req)
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("error querying Loki: %w", err)
}
Expand All @@ -141,7 +148,7 @@ func main() {
loki_install_type := os.Getenv("LOKI_INSTALL_TYPE")
if loki_install_type == "" {
log.Fatal("LOKI_INSTALL_TYPE env variable is not set.")
} else if loki_install_type != "openshift" && loki_install_type != "dev" {
} else if loki_install_type != ctr.LokiInstallTypeOpenShift && loki_install_type != ctr.LokiInstallTypeDev {
log.Fatal("LOKI_INSTALL_TYPE has to be either `openshift` or `dev`.")
}

Expand All @@ -151,7 +158,7 @@ func main() {
log_type := os.Getenv("LOG_TYPE")
tls_cert_file := os.Getenv("TLS_CERT_FILE")
tls_key_file := os.Getenv("TLS_KEY_FILE")
if loki_install_type == "openshift" {
if loki_install_type == ctr.LokiInstallTypeOpenShift {
if log_type == "" {
log.Printf("LOG_TYPE not defined, using default: %s", defaultLogType)
log_type = defaultLogType
Expand All @@ -160,8 +167,8 @@ func main() {
log.Fatal("TLS_CERT_FILE env variable is not set.")
}
if tls_key_file == "" {
log.Fatal("TLS_KEY_FILE env variable is not set.")
}
log.Fatal("TLS_KEY_FILE env variable is not set.")
}
}

namespace := os.Getenv("POD_NAMESPACE")
Expand All @@ -187,12 +194,12 @@ func main() {

lokiBaseURL := os.Getenv("LOKI_BASE_URL")
if lokiBaseURL == "" {
if loki_install_type == "openshift" {
if loki_install_type == ctr.LokiInstallTypeOpenShift {
log.Printf("LOKI_BASE_URL not defined, using default: %s", defaultOpenshiftLokiBaseURL)
lokiBaseURL = defaultOpenshiftLokiBaseURL
} else {
log.Printf("LOKI_BASE_URL not defined, using default: %s", defaultDevLokiBaseURL)
lokiBaseURL = defaultDevLokiBaseURL
lokiBaseURL = defaultDevLokiBaseURL
}
} else {
log.Printf("LOKI_BASE_URL: %s", lokiBaseURL)
Expand Down Expand Up @@ -233,16 +240,16 @@ func main() {
}

query := Query{
URL: lokiBaseURL,
Namespace: namespace,
NodeName: hostName,
Pod: pod,
Container: container,
Limit: limit,
URL: lokiBaseURL,
Namespace: namespace,
NodeName: hostName,
Pod: pod,
Container: container,
Limit: limit,
LokiInstallType: loki_install_type,
LogType: log_type,
TlsCertFile: tls_cert_file,
TlsKeyFile: tls_key_file,
LogType: log_type,
TlsCertFile: tls_cert_file,
TlsKeyFile: tls_key_file,
}

initialStartTime := fmt.Sprintf("%d", time.Now().Add(-initialTimeInterval).UnixNano())
Expand Down Expand Up @@ -283,7 +290,14 @@ func main() {

// Print the sorted slice
for _, value := range values {
fmt.Printf("%s\n", value[1])
logMessage := value[1]
if loki_install_type == ctr.LokiInstallTypeOpenShift {
logMessage, err = extractMessage(value[1])
if err != nil {
log.Fatalf("Failed to extract message: %v", err)
}
}
fmt.Printf("%s\n", logMessage)
}

time.Sleep(timeInterval)
Expand All @@ -309,3 +323,15 @@ func incrememtTimestampString(tsMs string, increment int64) (string, error) {
}
return fmt.Sprintf("%d", ts+increment), nil
}

func extractMessage(jsonStr string) (string, error) {
var entry LogEntry

// Unmarshal the JSON string into the LogEntry struct
err := json.Unmarshal([]byte(jsonStr), &entry)
if err != nil {
return "", err
}

return entry.Message, nil
}
13 changes: 10 additions & 3 deletions shadow-pods/cmd/shadow-pods/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func main() {
var secureMetrics bool
var enableHTTP2 bool
var shadowPodImage string
var lokiInstallType string
var certsSecretName string
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
Expand All @@ -68,6 +70,9 @@ func main() {
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
flag.StringVar(&shadowPodImage, "shadow-pod-image", "", "The image to use for the shadow pod.")
flag.StringVar(&lokiInstallType, "loki-install-type", "dev", "The loki install type - openshift or dev")
flag.StringVar(&certsSecretName, "certs-secret", "logging-loki-querier-http",
"The name of the secret with the certs to connect to loki, used only for openshit install type")
opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -125,9 +130,11 @@ func main() {
}

if err = (&controller.WorkflowReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
PodImage: shadowPodImage,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
PodImage: shadowPodImage,
LokiInstallType: lokiInstallType,
CertsSecretName: certsSecretName,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Workflow")
os.Exit(1)
Expand Down
1 change: 0 additions & 1 deletion shadow-pods/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.21
require (
github.com/argoproj/argo-workflows/v3 v3.5.6
github.com/onsi/ginkgo/v2 v2.14.0
github.com/onsi/gomega v1.30.0
k8s.io/api v0.29.2
k8s.io/apimachinery v0.29.2
k8s.io/client-go v0.29.2
Expand Down
65 changes: 58 additions & 7 deletions shadow-pods/internal/controller/workflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package controller
import (
"context"
"fmt"
"path/filepath"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand All @@ -35,15 +37,21 @@ import (
)

const (
schedulingLabelKey = "kubestellar.io/cluster"
hostingClusterName = "local"
schedulingLabelKey = "kubestellar.io/cluster"
hostingClusterName = "local"
LokiInstallTypeOpenShift = "openshift"
LokiInstallTypeDev = "dev"
certsVolume = "certs"
certsMountPath = "/certs"
)

// WorkflowReconciler reconciles a Workflow object
type WorkflowReconciler struct {
client.Client
Scheme *runtime.Scheme
PodImage string
Scheme *runtime.Scheme
PodImage string
LokiInstallType string
CertsSecretName string
}

// PodInfo is used to hold the relevant info for each pod in the workflow status
Expand Down Expand Up @@ -110,7 +118,7 @@ func (r *WorkflowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

if !isWorkflowCompletedOrError(workflow.Status.Phase) {
pod := generatePodTemplate(podInfo, r.PodImage)
pod := generatePodTemplate(podInfo, r.PodImage, r.LokiInstallType, r.CertsSecretName)
if err := controllerutil.SetControllerReference(workflow, pod, r.Scheme); err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -176,8 +184,8 @@ func splitString(input string) (firstPart string, lastPart string) {
return firstPart, lastPart
}

func generatePodTemplate(podInfo PodInfo, image string) *corev1.Pod {
return &corev1.Pod{
func generatePodTemplate(podInfo PodInfo, image, lokiInstallType, CertsSecretName string) *corev1.Pod {
podTemplate := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podInfo.Name,
Namespace: podInfo.Namespace,
Expand Down Expand Up @@ -206,6 +214,49 @@ func generatePodTemplate(podInfo PodInfo, image string) *corev1.Pod {
},
},
}
if lokiInstallType == LokiInstallTypeOpenShift {
podTemplate.Spec.Volumes = []corev1.Volume{
{
Name: certsVolume,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: CertsSecretName,
},
},
},
}
podTemplate.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{
{
Name: certsVolume,
ReadOnly: true,
MountPath: certsMountPath,
},
}
extraEnv := []corev1.EnvVar{
{
Name: "LOKI_INSTALL_TYPE",
Value: LokiInstallTypeOpenShift,
},
{
Name: "TLS_CERT_FILE",
Value: filepath.Join(certsMountPath, "tls.crt"),
},
{
Name: "TLS_KEY_FILE",
Value: filepath.Join(certsMountPath, "tls.key"),
},
}
podTemplate.Spec.Containers[0].Env = append(podTemplate.Spec.Containers[0].Env, extraEnv...)

podTemplate.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{
AllowPrivilegeEscalation: ptr.To(false),
RunAsNonRoot: ptr.To(true),
SeccompProfile: &corev1.SeccompProfile{
Type: corev1.SeccompProfileTypeRuntimeDefault,
},
}
}
return podTemplate
}

func checkPodExists(kClient client.Client, ctx context.Context, podInfo PodInfo) (bool, error) {
Expand Down

0 comments on commit a460780

Please sign in to comment.