Start working on the kafka stack

This commit is contained in:
Daniel Berteaud 2024-04-03 12:16:16 +02:00
parent 20965a8398
commit cc595f78ad
23 changed files with 996 additions and 0 deletions

4
bundles.yml Normal file
View File

@ -0,0 +1,4 @@
---
dependencies:
- url: ../common.git

23
images/jikkou/Dockerfile Normal file
View File

@ -0,0 +1,23 @@
FROM [[ .docker.repo ]][[ .docker.base_images.alpine.image ]] AS builder
ARG JIKKOU_VERSION=[[ .kafka.jikkou.version ]]
ADD https://github.com/streamthoughts/jikkou/releases/download/v${JIKKOU_VERSION}/jikkou-${JIKKOU_VERSION}-linux-x86_64.tar.gz /tmp
ADD https://github.com/streamthoughts/jikkou/releases/download/v${JIKKOU_VERSION}/checksums_sha256.txt /tmp
RUN set -euxo pipefail &&\
apk --no-cache add tar gzip &&\
cd /tmp &&\
grep "jikkou-${JIKKOU_VERSION}-linux-x86_64.tar.gz" checksums_sha256.txt | sha256sum -c &&\
tar xvf jikkou-${JIKKOU_VERSION}-linux-x86_64.tar.gz &&\
mv jikkou-${JIKKOU_VERSION}-linux-x86_64 /opt/jikkou &&\
rm -f /opt/jikkou/bin/jikkou.bat
FROM [[ .docker.repo ]][[ .docker.base_images.java17.image ]]
MAINTAINER [[ .docker.maintainer ]]
ENV PATH=:/opt/jikkou/bin:$PATH
COPY --from=builder /opt/jikkou/ /opt/jikkou/
RUN set -euxo pipefail &&\
apk --no-cache add openssl gcompat
CMD ["jikkou"]

View File

@ -0,0 +1,63 @@
FROM [[ .docker.repo ]][[ .docker.base_images.java17.image ]] AS builder
ARG KAFKA_VERSION=[[ .kafka.broker.version ]] \
SCALA_VERSION=2.13
ADD https://www.apache.org/dyn/closer.cgi?action=download&filename=kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
ADD https://www.apache.org/dyn/closer.cgi?action=download&filename=kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz.asc /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz.asc
ADD https://downloads.apache.org/kafka/KEYS /tmp/
RUN set -ex &&\
apk add --no-cache \
curl \
ca-certificates \
gnupg \
tar \
gzip \
&&\
cd /tmp &&\
export GNUPGHOME="$(mktemp -d)" &&\
gpg --import KEYS &&\
gpg --batch --verify kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz.asc kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz &&\
tar xvf kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz &&\
mv kafka_${SCALA_VERSION}-${KAFKA_VERSION} /opt/kafka &&\
rm -rf /opt/kafka/bin/windows /opt/kafka/site-docs
FROM [[ .docker.repo ]][[ .docker.base_images.java17.image ]]
MAINTAINER [[ .docker.maintainer ]]
ENV PATH=$PATH:/opt/kafka/bin \
KAFKA_DATA_DIR=/data \
KAFKA_CONF_DIR=/opt/kafka/config \
KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" \
KAFKA_GC_LOG_OPTS=" " \
LOG_DIR=/logs
COPY --from=builder /opt/kafka/ /opt/kafka/
RUN set -euxo pipefail &&\
apk --no-cache add \
bash \
openssl \
&&\
addgroup -g 9092 kafka &&\
adduser --system \
--ingroup kafka \
--disabled-password \
--uid 9092 \
--home /opt/kafka \
--no-create-home \
--shell /sbin/nologin \
kafka &&\
mkdir /data /logs &&\
chown kafka.kafka /data /logs &&\
chmod 700 /data /logs
COPY root/ /
RUN set -euxo pipefail &&\
chown kafka.kafka /opt/kafka/config/server.properties &&\
chmod 640 /opt/kafka/config/server.properties
EXPOSE 9092
USER kafka
CMD ["bash", "-c", "exec kafka-server-start.sh ${KAFKA_CONF_DIR}/server.properties"]

View File

@ -0,0 +1,11 @@
#!/bin/sh
set -eo pipefail
if [ -w "${KAFKA_CONF_DIR}/server.properties" ]; then
umask 077
TMPCONF=$(mktemp)
envsubst < ${KAFKA_CONF_DIR}/server.properties > ${TMPCONF}
cat ${TMPCONF} > ${KAFKA_CONF_DIR}/server.properties
rm -f ${TMPCONF}
fi

View File

@ -0,0 +1,6 @@
#!/bin/sh
set -eo pipefail
if [ -n "${KAFKA_CLUSTER_ID}" ]; then
kafka-storage.sh format --cluster-id ${KAFKA_CLUSTER_ID} --config ${KAFKA_CONF_DIR}/server.properties --ignore-formatted
fi

View File

@ -0,0 +1,15 @@
#!/bin/sh
set -euo pipefail
SRV=$1
OUTPUT=""
IFS=$'\n'
for SERVER in $(nslookup -type=srv ${SRV} 2>&1 | awk 'f;/Non-authoritative/{f=1}' | sed '$d'); do
PORT=$(echo -n ${SERVER} | awk '{ print $6 }')
CNAME=$(echo -n ${SERVER} | awk '{ print $7 }')
ADDR=$(nslookup -type=a ${CNAME} 2>&1 | awk -F': ' 'NR==6 { print $2 }')
OUTPUT="${OUTPUT}${ADDR}:${PORT},"
done
echo $(echo -n ${OUTPUT} | sed 's/,$//')

View File

@ -0,0 +1,103 @@
lowercaseOutputName: true
rules:
# Special cases and very specific rules
- pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
- pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
broker: "$4:$5"
- pattern : kafka.coordinator.(\w+)<type=(.+), name=(.+)><>Value
name: kafka_coordinator_$1_$2_$3
type: GAUGE
# Generic per-second counters with 0-2 key/value pairs
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
- pattern: kafka.server<type=(.+), client-id=(.+)><>([a-z-]+)
name: kafka_server_quota_$3
type: GAUGE
labels:
resource: "$1"
clientId: "$2"
- pattern: kafka.server<type=(.+), user=(.+), client-id=(.+)><>([a-z-]+)
name: kafka_server_quota_$4
type: GAUGE
labels:
resource: "$1"
user: "$2"
clientId: "$3"
# Generic gauges with 0-2 key/value pairs
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
# Emulate Prometheus 'Summary' metrics for the exported 'Histogram's.
#
# Note that these are missing the '_sum' metric!
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
"$6": "$7"
quantile: "0.$8"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
quantile: "0.$6"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
quantile: "0.$4"

View File

@ -0,0 +1,4 @@
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

View File

@ -0,0 +1,132 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This configuration file is intended for use in KRaft mode, where
# Apache ZooKeeper is not present. See config/kraft/README.md for details.
#
############################# Server Basics #############################
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093
############################# Socket Server Settings #############################
# The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=${KAFKA_DATA_DIR}
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

View File

@ -0,0 +1,21 @@
FROM [[ .docker.repo ]][[ .docker.base_images.alpine.image ]] AS builder
ARG EXPORTER_VERSION=[[ .kafka.exporter.version ]]
ADD https://github.com/danielqsj/kafka_exporter/releases/download/v${EXPORTER_VERSION}/kafka_exporter-${EXPORTER_VERSION}.linux-amd64.tar.gz /tmp
RUN set -euxo pipefail &&\
cd /tmp &&\
apk --no-cache add tar &&\
tar xvzf kafka_exporter-${EXPORTER_VERSION}.linux-amd64.tar.gz &&\
mv kafka_exporter-${EXPORTER_VERSION}.linux-amd64/kafka_exporter /usr/local/bin/kafka_exporter &&\
chown root:root /usr/local/bin/kafka_exporter &&\
chmod 755 /usr/local/bin/kafka_exporter
FROM [[ .docker.repo ]][[ .docker.base_images.alpine.image ]]
MAINTAINER [[ .docker.maintainer ]]
COPY --from=builder /usr/local/bin/kafka_exporter /usr/local/bin/kafka_exporter
CMD ["kafka_exporter"]

45
init/kafka-broker-pki Executable file
View File

@ -0,0 +1,45 @@
#!/bin/sh
set -e
[[- $c := merge .kafka .]]
[[ template "common/vault.mkpki.sh" $c ]]
# Role for the brokers
vault write [[ $c.vault.pki.path ]]/roles/[[ .instance ]]-broker \
allowed_domains="[[ .instance ]]-broker[[ .consul.suffix ]],[[ .instance ]]-broker[[ .consul.suffix ]].service.[[ .consul.domain ]]" \
allow_bare_domains=true \
allow_subdomains=true \
allow_localhost=true \
allow_ip_sans=true \
server_flag=true \
client_flag=true \
allow_wildcard_certificates=false \
max_ttl=720h \
ou="[[ $c.vault.pki.ou ]]"
# Role for the prometheus exporter
vault write [[ $c.vault.pki.path ]]/roles/[[ .instance ]]-exporter \
allowed_domains="[[ .instance ]]-exporter" \
allow_bare_domains=true \
allow_subdomains=false \
allow_localhost=false \
allow_ip_sans=false \
server_flag=false \
client_flag=true \
allow_wildcard_certificates=false \
max_ttl=72h \
ou="[[ $c.vault.pki.ou ]]"
# Role for Jikkou (topic and ACL management tool)
vault write [[ $c.vault.pki.path ]]/roles/[[ .instance ]]-jikkou \
allowed_domains="[[ .instance ]]-jikkou" \
allow_bare_domains=true \
allow_subdomains=false \
allow_localhost=false \
allow_ip_sans=false \
server_flag=false \
client_flag=true \
allow_wildcard_certificates=false \
max_ttl=1h \
ou="[[ $c.vault.pki.ou ]]"

237
kafka-brokers.nomad.hcl Normal file
View File

@ -0,0 +1,237 @@
job "[[ .instance ]]-brokers" {
[[- $c := merge .kafka . ]]
[[ template "common/job_start" . ]]
group "broker" {
[[ $c := merge $c.broker $c ]]
count = [[ $c.count ]]
shutdown_delay = "6s"
[[ template "common/constraints" $c ]]
network {
mode = "bridge"
port "client" {
[[- if has $c "static_client_port" ]]
static = [[ $c.static_client_port ]]
[[- end ]]
}
port "broker" {
[[- if has $c "static_broker_port" ]]
static = [[ $c.static_broker_port ]]
[[- end ]]
}
[[- if conv.ToBool $c.prometheus.enabled ]]
port "metrics" {}
[[- end ]]
}
[[ template "common/volumes" $c ]]
service {
name = "[[ .instance ]]-broker[[ .consul.suffix ]]"
port = "client"
[[ template "common/service_meta" $c ]]
[[ template "common/connect" $c ]]
# Checks the broker is ready
# See states signification here : https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/BrokerState.java
check {
name = "health"
type = "script"
command = "sh"
args = [
"-c",
"STATE=$(curl http://127.0.0.1:9404 | grep '^kafka_server_kafkaserver_brokerstate' | sed -r 's/.* (\\d+)\\.0/\\1/'); echo \"Broker state: ${STATE}\"; if [ \"${STATE}\" = \"3\" ]; then exit 0; else exit 1; fi"
]
interval = "30s"
timeout = "8s"
task = "kafka-broker"
}
tags = [
"broker-${NOMAD_ALLOC_INDEX}"
]
}
[[ template "common/task.wait_for" $c ]]
task "kafka-broker" {
driver = "[[ $c.nomad.driver ]]"
leader = true
config {
image = "[[ $c.image ]]"
readonly_rootfs = true
pids_limit = 500
volumes = [
"local/conf/server.properties:/opt/kafka/config/server.properties:ro",
"local/conf/client.properties:/opt/kafka/config/client.properties:ro"
]
}
[[ template "common/artifacts" $c ]]
[[ template "common/vault.policies" $c ]]
env {
# So the PEM certs from vault are converted to PKCS12 for Kafka to use
PEM_KEY_FILE = "/secrets/kafka.bundle.pem"
PEM_CERT_FILE = "/secrets/kafka.bundle.pem"
P12_FILE = "/secrets/kafka.p12"
}
[[ template "common/file_env" $c ]]
# Some env vars
template {
data = <<_EOT
KAFKA_HEAP_OPTS=-Xmx[[ $c.resources.memory | mul 0.3 | math.Ceil ]]m -Xms[[ $c.resources.memory | mul 0.3 | math.Ceil ]]m
KAFKA_OPTS=-javaagent:/jmx_exporter/jmx_prometheus_javaagent.jar=127.0.0.1:9404:/opt/kafka/config/jmx_exporter.yml
[[- if ne $c.zookeeper.user "" ]] -Djava.security.auth.login.config=/secrets/jaas.conf[[- end ]]
_EOT
destination = "secrets/.kafka.env"
perms = 400
env = true
}
[[- if ne $c.zookeeper.user "" ]]
# SASL user/pass to auth on ZooKeeper
template {
data = <<_EOT
[[ template "kafka/brokers/jaas.conf" $c ]]
_EOT
destination = "secrets/jaas.conf"
uid = 100000
gid = 109092
perms = "0640"
}
[[- end ]]
# The main broker configuration
template {
data = <<_EOT
[[ template "kafka/brokers/server.properties" $c ]]
_EOT
destination = "local/conf/server.properties"
}
# A configuration to be used by client tools
template {
data = <<_EOT
[[ template "kafka/brokers/client.properties" $c ]]
_EOT
destination = "local/conf/client.properties"
# No need to restart if the client conf changes
change_mode = "noop"
}
# The certificate and private key for the broker
template {
data = <<_EOT
{{- with pkiCert "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-broker"
"common_name=[[ .instance ]]-broker[[ .consul.suffix ]]"
(printf "alt_names=broker-%s.[[ .instance ]]-broker[[ .consul.suffix ]].service.[[ .consul.domain ]],[[ .instance ]]-broker[[ .consul.suffix ]].service.[[ .consul.domain ]],localhost" (env "NOMAD_ALLOC_INDEX"))
(printf "ip_sans=%s,127.0.0.1" (env "NOMAD_HOST_IP_client"))
(printf "ttl=%dh" (env "NOMAD_ALLOC_INDEX" | parseInt | multiply 24 | add 650)) }}
{{ .Cert }}
{{ .Key }}
{{- end }}
_EOT
destination = "secrets/kafka.bundle.pem"
uid = 100000
gid = 109092
perms = "0640"
}
# The trusted CA for the broker
template {
data = <<_EOT
{{ with secret "[[ $c.vault.pki.path ]]/cert/ca_chain" }}{{ .Data.ca_chain }}{{ end }}
_EOT
destination = "secrets/kafka.ca.pem"
uid = 100000
gid = 100000
}
# Mount the persistent volume on /data
volume_mount {
volume = "data"
destination = "/data"
}
[[ template "common/resources" $c ]]
}
}
[[- if conv.ToBool $c.prometheus.enabled ]]
[[- $c := merge .kafka.exporter $c ]]
group "exporter" {
network {
mode = "bridge"
port "metrics" {}
}
service {
name = "[[ .instance ]]-exporter[[ .consul.suffix ]]"
[[ template "common/service_meta" $c ]]
}
[[ template "common/task.wait_for" $c ]]
task "kafka-exporter" {
driver = "[[ $c.nomad.driver ]]"
user = 9308
config {
image = "[[ $c.image ]]"
readonly_rootfs = true
pids_limit = 100
command = "/local/kafka_exporter"
}
[[ template "common/artifacts" $c ]]
[[ template "common/vault.policies" $c ]]
[[ template "common/file_env" $c ]]
template {
data = <<_EOT
[[ template "kafka/exporter/start.sh" $c ]]
_EOT
destination = "local/kafka_exporter"
perms = "0755"
}
[[ template "common/metrics_cert" $c ]]
# Ask vault for a client certificate to connect on the brokers
template {
data = <<_EOT
{{- with pkiCert "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-exporter"
"common_name=[[ .instance ]]-exporter" "ttl=72h" }}
{{ .Cert }}
{{ .Key }}
{{- end }}
_EOT
destination = "secrets/kafka.bundle.pem"
uid = 109308
gid = 100000
perms = "0400"
}
# The CA chain to validate kafka brokers
template {
data = <<_EOT
{{ with secret "[[ $c.vault.pki.path ]]/cert/ca_chain" }}{{ .Data.ca_chain }}{{ end }}
_EOT
destination = "local/kafka.ca.pem"
}
}
}
[[- end ]]
}

122
kafka-jikkou.nomad.hcl Normal file
View File

@ -0,0 +1,122 @@
job "[[ .instance ]]-jikkou" {
[[- $c := merge .kafka . ]]
[[ template "common/job_start" $c ]]
type = "batch"
meta {
# Force job to be different for each execution
run_uuid = "${uuidv4()}"
}
group "jikkou" {
[[- $c := merge $c.jikkou $c ]]
network {
mode = "bridge"
}
[[ template "common/task.wait_for" $c ]]
task "jikkou" {
driver = "[[ $c.nomad.driver ]]"
config {
image = "[[ $c.image ]]"
readonly_rootfs = true
pids_limit = 100
command = "/local/jikkou"
}
[[ template "common/vault.policies" $c ]]
env {
PEM_KEY_FILE = "/secrets/jikkou.bundle.pem"
PEM_CERT_FILE = "/secrets/jikkou.bundle.pem"
P12_FILE = "/secrets/jikkou.p12"
}
[[ template "common/file_env" $c ]]
template {
data = <<_EOT
KAFKA_BOOTSTRAP_SERVERS=
{{- range $index, $instance := service "[[ .instance ]]-broker[[ .consul.suffix ]]" -}}
{{ if ne $index 0 }},{{ end }}{{ $instance.Address }}:{{ $instance.Port }}
{{- end -}}
_EOT
change_mode = "noop"
destination = "secrets/.jikkou.env"
env = true
}
# A wrapper script to apply topic definition and ACL
template {
data = <<_EOT
[[ template "kafka/jikkou/jikkou" $c ]]
_EOT
destination = "local/jikkou"
perms = 755
}
# The jikkou configuration file
template {
data = <<_EOT
[[ template "kafka/jikkou/jikkou.conf" $c ]]
_EOT
destination = "local/jikkou.conf"
perms = 755
}
# Kafka topics definition
template {
data = <<_EOT
[[- if isKind "string" $c.topics ]]
[[ $c.topics ]]
[[- else ]]
# Invalid kafka topics definition
[[- end ]]
_EOT
destination = "local/kafka-topics.yml"
}
# Kafka ACL definition
template {
data = <<_EOT
[[- if isKind "string" $c.acls ]]
[[ $c.acls ]]
[[- else ]]
# Invalid kafka ACL definition
[[- end ]]
_EOT
destination = "local/kafka-acls.yml"
}
# Client cert used by Jikkou to connect on kafka brokers
template {
data = <<_EOT
{{- with pkiCert "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-jikkou" "common_name=[[ .instance ]]-jikkou" "ttl=15m" }}
{{ .Cert }}
{{ .Key }}
{{- end }}
_EOT
destination = "secrets/jikkou.bundle.pem"
uid = 100000
gid = 100000
perms = "0400"
}
# CA certificate to validate brokers' cert
template {
data = <<_EOT
{{ with secret "[[ $c.vault.pki.path ]]/cert/ca_chain" }}{{ .Data.ca_chain }}{{ end }}
_EOT
destination = "secrets/jikkou.ca.pem"
}
[[ template "common/resources" $c ]]
}
}
}

View File

@ -0,0 +1,13 @@
security.protocol=SSL
ssl.keystore.location=/secrets/kafka.p12
ssl.keystore.password=password
ssl.keystore.type=PKCS12
ssl.truststore.location=/secrets/kafka.ca.pem
ssl.truststore.type=PEM
ssl.client.auth=required
ssl.secure.random.implementation=SHA1PRNG
client.dns.lookup=use_all_dns_ips
bootstrap.servers=
{{- range $index, $instance := service "[[ .instance ]]-broker[[ .consul.suffix ]]" -}}
{{ if ne $index 0 }},{{ end }}{{ $instance.Address }}:{{ $instance.Port }}
{{- end }}

View File

@ -0,0 +1,6 @@
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="[[ .zookeeper.user ]]"
password="[[ .zookeeper.password ]]"
;
};

View File

@ -0,0 +1,37 @@
# Broker ID
broker.id={{ env "NOMAD_ALLOC_INDEX" }}
# Network settings
listeners=CLIENT://:{{ env "NOMAD_PORT_client" }},BROKER://:{{ env "NOMAD_PORT_broker" }}
listener.security.protocol.map=CLIENT:SSL,BROKER:SSL
inter.broker.listener.name=BROKER
advertised.listeners=CLIENT://broker-{{ env "NOMAD_ALLOC_INDEX" }}.exchange-broker{{ env "ENV_SUFFIX" }}.service.{{ env "CT_DOMAIN" }}:{{ env "NOMAD_HOST_PORT_client" }},BROKER://{{ env "NOMAD_ADDR_broker" }}
# TLS Settings
ssl.keystore.location=/secrets/kafka.p12
ssl.keystore.password=password
ssl.keystore.type=PKCS12
ssl.truststore.location=/secrets/kafka.ca.pem
ssl.truststore.type=PEM
ssl.client.auth=required
ssl.secure.random.implementation=SHA1PRNG
ssl.principal.mapping.rules=\
RULE:^CN=([^,]+),OU=.*$/$1/,\
DEFAULT
# Storage settings
log.dirs=/data/kafka
# Zookeeper settings
zookeeper.connect=[[ join .zookeeper.servers "," ]][[ if has .zookeeper "path" ]][[ .zookeeper.path ]][[ end ]]
# ACL
[[- if .use_acl ]]
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=[[ range $idx, $user := .super_users ]][[ if ne $idx 0 ]];[[ end ]]User:[[ $user ]][[ end ]]
[[ end ]]
# Other settings
[[- range $k, $v := .settings ]]
[[ $k ]]=[[ $v ]]
[[- end ]]

View File

@ -0,0 +1,19 @@
#!/bin/sh
set -euo pipefail
exec kafka_exporter \
--tls.enabled \
--tls.ca-file=/local/kafka.ca.pem \
--tls.cert-file=/secrets/kafka.bundle.pem \
--tls.key-file=/secrets/kafka.bundle.pem \
--server.tls.enabled \
--server.tls.mutual-auth-enabled \
--server.tls.ca-file=/local/monitoring.ca.pem \
--server.tls.cert-file=/secrets/metrics.bundle.pem \
--server.tls.key-file=/secrets/metrics.bundle.pem \
{{- range $index, $instance := service "[[ .instance ]]-broker[[ .consul.suffix ]]" }}
--kafka.server={{ $instance.Address }}:{{ $instance.Port }} \
{{- end }}
--web.listen-address=:${NOMAD_ALLOC_PORT_metrics}

17
templates/jikkou/jikkou Normal file
View File

@ -0,0 +1,17 @@
#!/bin/sh
set -eux
jikkou config set-context [[ .instance ]] --config-file=/local/jikkou.conf
jikkou config view --name [[ .instance ]]
echo "Create and modify topics"
jikkou update \
--selector="kind IN (KafkaTopic,KafkaTopicList)" \
--files=/local/kafka-topics.yml
echo "Apply ACLs"
jikkou apply \
--selector="kind IN (KafkaPrincipalRole,KafkaPrincipalAuthorization)" \
--options=delete-orphans=true \
--files=/local/kafka-acls.yml

View File

@ -0,0 +1,19 @@
jikkou {
kafka {
client {
bootstrap.servers = "
{{- range $index, $instance := service "[[ .instance ]]-broker[ .consul.suffix ]]" -}}
{{ if ne $index 0 }},{{ end }}{{ $instance.Address }}:{{ $instance.Port }}
{{- end -}}
"
security.protocol = "SSL"
ssl.keystore.location = "/secrets/jikkou.p12"
ssl.keystore.type = "PKCS12"
ssl.keystore.password = "password"
ssl.truststore.location = "/secrets/jikkou.ca.pem"
ssl.truststore.type = "PEM"
client.id = "[[ .instance ]]-jikkou"
client.dns.lookup = "use_all_dns_ips"
}
}
}

86
variables.yml Normal file
View File

@ -0,0 +1,86 @@
---
instance: kafka
vault:
pki:
path: "[[ .vault.root ]]pki/kafka"
ou: Kafka Cluster ([[ .instance ]])
kafka:
broker:
count: 3
version: 3.7.0
image: '[[ .docker.repo ]]kafka-broker:[[ .kafka.broker.version ]]-1'
env: {}
resources:
cpu: 100
memory: 1024
prometheus:
enabled: '[[ .prometheus.available ]]'
# static_client_port: 9092
# static_broker_port: 9095
consul:
meta:
broker: 'broker-${NOMAD_ALLOC_INDEX}.[[ .instance ]][[ .consul.suffix ]].service.[[ .consul.domain ]]'
connect:
upstreams:
- destination_name: zookeeper[[ .consul.suffix ]]
local_bind_port: 2181
vault:
policies:
- '[[ .instance ]]-broker[[ .consul.suffix ]]'
wait_for:
- service: zookeeper[[ .consul.suffix ]]
count: 2
zookeeper:
servers:
- 127.0.0.1:2181
# path: /kafka
user: ""
password: ""
use_acl: true
super_users:
- '[[ .instance ]]-jikkou'
settings:
log.retention.hours: 168
compression.type: zstd
zookeeper.connection.timeout.ms: 600
zookeeper.set.acl: true
message.max.bytes: 1073741824
auto.create.topics.enable: false
transactional.id.expiration.ms: 2147483647
offsets.retention.minutes: 13140
volumes:
data:
type: csi
source: '[[ .instance ]]-broker-data'
per_alloc: true
jikkou:
version: 0.33.3
image: '[[ .docker.repo ]]jikkou:[[ .kafka.jikkou.version ]]-1'
env: {}
resources:
cpu: 10
memory: 256
vault:
policies:
- '[[ .instance ]]-jikkou[[ .consul.suffix ]]'
topics: ""
acls: ""
exporter:
version: 1.7.0
image: '[[ .docker.repo ]]kafka-exporter:[[ .kafka.exporter.version ]]-1'
env: {}
wait_for:
- service: '[[ .instance ]]-broker[[ .consul.suffix ]]'
count: '[[ .kafka.broker.count | mul 0.5 | add 1 | math.Floor ]]'
vault:
policies:
- '[[ .instance ]]-exporter[[ .consul.suffix ]]'
- metrics[[ .consul.suffix ]]
resources:
cpu: 10
memory: 50

View File

@ -0,0 +1,4 @@
[[- $c := merge .kafka.broker .kafka . ]]
path "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-broker" {
capabilities = ["update"]
}

View File

@ -0,0 +1,5 @@
[[- $c := merge .kafka.exporter .kafka . ]]
path "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-exporter" {
capabilities = ["update"]
}

View File

@ -0,0 +1,4 @@
[[- $c := merge .kafka . ]]
path "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-jikkou" {
capabilities = ["update"]
}