Kafka connect CLI inspired by kaf and kubectl. Kofr wraps all of kafka connect REST API operations.

Add a new connect cluster

$ kofr config add-cluster dev --hosts http://localhost:8083

Change context to a cluster:

$ kofr config use-cluster dev

List available clusters

$ kofr config get-clusters

Connectors operations

List running connectors

$ kofr ls
 NAME                STATE     TASKS   TYPE     WORKER_ID
 load-kafka-config   RUNNING   1       SOURCE
 test-connector      RUNNING   1       SINK

List current connect cluster status

$ kofr cluster status
 Current Cluster: dev
 id : "GnW0xXSmqeO-t6CQVTJg"
 HOST                      STATE
 http://localhost:8083     Online
 http://localhost:8080     Offline

Describing a connector

$ kofr cn describe <connector-name>
  "name": "test-connector",
  "config": {
    "name": "test-connector",
    "tasks.max": "1",
    "topics": "test-topic",
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector"
  "connector": {
    "state": "RUNNING",
    "worker_id": ""
  "tasks": [
      "id": 0,
      "state": "RUNNING",
      "worker_id": ""
  "type": "sink"

Create a connector from a configuration file, like kubectl.

$ echo '{"name":"loadd-kafka-config", "config":{"connector.class":"FileStreamSource","file":"config/server.properties","topic":"kafka-config-topic"}}' \
| kofr cn create -f -

Edit a running connector config, this will open $EDITOR, similar to kubectl.

$ kofr cn edit <connector-name>

Restarting, pausing and resuming a connector.

$ kofr cn restart <connector-name>
# alternatively, you can specify tasks options
$ kofr cn restart <connector-name> --include-tasks
$ kofr cn restart <connector-name> --only-failed

$ kofr cn pause <connector-name>
$ kofr cn resume <connector-name>

Patch a running connector with new configuration

$ kofr cn patch test-connector -d '{"file": "config/server.properties","name": "load-kafka-config","connector.class": "FileStreamSource","topic": "kafka-config-topic"'

Delete a running connector

$ kofr cn delete <connector-name>

Tasks operations

List tasks of a running connector

$ kofr tasks ls test-connector
Active tasks of connector: 'test-connector'
 0    RUNNING   -
 1    PAUSED   -

Restarting a task

$ kofr task restart sink-connector 0

Getting a task status

$ kofr task status test-connector 0
  "config": {
    "task.class": "org.apache.kafka.connect.file.FileStreamSinkTask",
    "topics": "test-topic"
  "status": {
    "id": 0,
    "state": "RUNNING",
    "worker_id": ""

Connect plugins

List installed plugins on the cluster

$ kofr plugin ls

validate a given connector confiugration with a connector plugin.

$ echo '{"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "test-topic"
}' | kofr plugin validate-config -f -


By default, kofr reads config from ~/.kofr/config See examples for a basic config file.


I welcome fixes for bugs or better ways of doing things or more importantly, code reviews. Kofr was made by the motivation of solving a problem when having to deal with multiple kafka connect clusters at my work was mundane and more importantly, learning rust wink-wink. I use it personally like I use kubectl or kaf. See issues for things I'd like to improve. If you have a question about the codebase or simply want to discuss something feel free to open an issue. See also kcmockserver which I used in testing kofr. It's incomplete and could use some contributions.

