# golang nsq & Kafka https://cloud.tencent.com/developer/article/1820268 https://ithelp.ithome.com.tw/articles/10194183 https://iter01.com/547005.html # docker compose ![](https://i.imgur.com/wCRDhn0.png) docker compose ```yaml= version: '3' services: nsqlookupd: image: nsqio/nsq command: /nsqlookupd ports: - "4160" - "4161" nsqd: image: nsqio/nsq command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 depends_on: - nsqlookupd ports: - "4150" - "4151" nsqadmin: image: nsqio/nsq command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 depends_on: - nsqlookupd ports: - "4171" ``` ```bash= docker compose up -d docker compose down docker compose ps ``` # nsq-consumer ```go= package main import ( "log" "os" "os/signal" "syscall" "github.com/nsqio/go-nsq" ) type myMessageHandler struct{} func (h *myMessageHandler) HandleMessage(m *nsq.Message) error { log.Printf("接收到了一個訊息:%v", m) log.Printf("這個訊息轉換成字串就是:%v", string(m.Body)) return nil } func main() { //建立Consumer config := nsq.NewConfig() consumer, err := nsq.NewConsumer("GONSQ_TOPIC", "channel", config) if err != nil { log.Fatal(err) } //新增一個handler來處理收到訊息時動作 consumer.AddHandler(&myMessageHandler{}) //連線到NSQD err = consumer.ConnectToNSQD("localhost:56117") if err != nil { log.Fatal(err) } //卡住,不要讓main.go執行完就結束 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan consumer.Stop() } ``` # nsq-producer ```go= package main import ( "log" "github.com/nsqio/go-nsq" ) func main() { //建立一個Producer config := nsq.NewConfig() producer, err := nsq.NewProducer("127.0.0.1:56117", config) if err != nil { log.Fatal(err) } messageBody := []byte("hello") topicName := "GONSQ_TOPIC" //發佈到定義好的topic err = producer.Publish(topicName, messageBody) if err != nil { log.Fatal(err) } producer.Stop() } ``` ![](https://i.imgur.com/NRxzPdX.png) # kafka ![](https://i.imgur.com/7OpsY67.png) zookeeper 算是類似 spring eureka的東西 https://codeantenna.com/a/Ne1DCTJikV https://cutejaneii.wordpress.com/2017/06/19/docker-7-%E7%94%A8docker%E5%BB%BA%E7%AB%8Bkafka%E6%9C%8D%E5%8B%99%E4%B8%8A/ https://www.cnblogs.com/swordfall/p/10014300.html https://www.codeleading.com/article/56604973219/ # docker compose ```yaml= networks: kafka-cluster: name: kafka-cluster driver: bridge services: zookeeper: image: bitnami/zookeeper:3.6.2 container_name: zookeeper ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes networks: - kafka-cluster kafka1: image: bitnami/kafka:2.7.0 container_name: kafka1 ports: - '9093:9093' environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka1:9092,EXTERNAL://localhost:9093 - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true depends_on: - zookeeper networks: - kafka-cluster kafka2: image: bitnami/kafka:2.7.0 container_name: kafka2 ports: - '9094:9094' environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka2:9092,EXTERNAL://localhost:9094 - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true depends_on: - zookeeper networks: - kafka-cluster kafka3: image: bitnami/kafka:2.7.0 container_name: kafka3 ports: - '9095:9095' environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9095 - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka3:9092,EXTERNAL://localhost:9095 - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true depends_on: - zookeeper networks: - kafka-cluster kafdrop: image: obsidiandynamics/kafdrop:latest container_name: kafdrop ports: - 9000:9000 environment: - KAFKA_BROKERCONNECT=kafka1:9092,kafka2:9092,kafka3:9092 depends_on: - kafka1 networks: - kafka-cluster ``` # kafka-consumer ```go= package main import ( "fmt" "sync" "github.com/Shopify/sarama" ) var wg sync.WaitGroup func main() { consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9093"}, nil) if err != nil { fmt.Println("consumer connect error:", err) return } fmt.Println("connnect success...") defer consumer.Close() partitions, err := consumer.Partitions("revolution") if err != nil { fmt.Println("geet partitions failed, err:", err) return } for _, p := range partitions { partitionConsumer, err := consumer.ConsumePartition("revolution", p, sarama.OffsetOldest) if err != nil { fmt.Println("partitionConsumer err:", err) continue } wg.Add(1) go func() { for m := range partitionConsumer.Messages() { fmt.Printf("key: %s, text: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset) } wg.Done() }() } wg.Wait() } ``` # kafka-producer ```go= package main import ( "fmt" "github.com/Shopify/sarama" ) func main() { // 新建一个arama配置实例 config := sarama.NewConfig() // WaitForAll waits for all in-sync replicas to commit before responding. config.Producer.RequiredAcks = sarama.WaitForAll // NewRandomPartitioner returns a Partitioner which chooses a random partition each time. config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true // 新建一个同步生产者 client, err := sarama.NewSyncProducer([]string{"localhost:9093"}, config) if err != nil { fmt.Println("producer close, err:", err) return } defer client.Close() // 定义一个生产消息,包括Topic、消息内容、 msg := &sarama.ProducerMessage{} msg.Topic = "revolution" msg.Key = sarama.StringEncoder("miles") msg.Value = sarama.StringEncoder("hello world...") // 发送消息 pid, offset, err := client.SendMessage(msg) msg2 := &sarama.ProducerMessage{} msg2.Topic = "revolution" msg2.Key = sarama.StringEncoder("monroe") msg2.Value = sarama.StringEncoder("hello world2...") pid2, offset2, err := client.SendMessage(msg2) if err != nil { fmt.Println("send message failed,", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) fmt.Printf("pid2:%v offset2:%v\n", pid2, offset2) } ```