123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- // Copyright 2012 Gary Burd
- //
- // Licensed under the Apache License, Version 2.0 (the "License"): you may
- // not use this file except in compliance with the License. You may obtain
- // a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- // License for the specific language governing permissions and limitations
- // under the License.
- package redis
- import (
- "fmt"
- "reflect"
- "sync"
- "testing"
- )
- func publish(channel, value interface{}) {
- c, err := dial()
- if err != nil {
- fmt.Println(err)
- return
- }
- defer c.Close()
- c.Do("PUBLISH", channel, value)
- }
- // Applications can receive pushed messages from one goroutine and manage subscriptions from another goroutine.
- func ExamplePubSubConn() {
- c, err := dial()
- if err != nil {
- fmt.Println(err)
- return
- }
- defer c.Close()
- var wg sync.WaitGroup
- wg.Add(2)
- psc := PubSubConn{Conn: c}
- // This goroutine receives and prints pushed notifications from the server.
- // The goroutine exits when the connection is unsubscribed from all
- // channels or there is an error.
- go func() {
- defer wg.Done()
- for {
- switch n := psc.Receive().(type) {
- case Message:
- fmt.Printf("Message: %s %s\n", n.Channel, n.Data)
- case PMessage:
- fmt.Printf("PMessage: %s %s %s\n", n.Pattern, n.Channel, n.Data)
- case Subscription:
- fmt.Printf("Subscription: %s %s %d\n", n.Kind, n.Channel, n.Count)
- if n.Count == 0 {
- return
- }
- case error:
- fmt.Printf("error: %v\n", n)
- return
- }
- }
- }()
- // This goroutine manages subscriptions for the connection.
- go func() {
- defer wg.Done()
- psc.Subscribe("example")
- psc.PSubscribe("p*")
- // The following function calls publish a message using another
- // connection to the Redis server.
- publish("example", "hello")
- publish("example", "world")
- publish("pexample", "foo")
- publish("pexample", "bar")
- // Unsubscribe from all connections. This will cause the receiving
- // goroutine to exit.
- psc.Unsubscribe()
- psc.PUnsubscribe()
- }()
- wg.Wait()
- // Output:
- // Subscription: subscribe example 1
- // Subscription: psubscribe p* 2
- // Message: example hello
- // Message: example world
- // PMessage: p* pexample foo
- // PMessage: p* pexample bar
- // Subscription: unsubscribe example 1
- // Subscription: punsubscribe p* 0
- }
- func expectPushed(t *testing.T, c PubSubConn, message string, expected interface{}) {
- actual := c.Receive()
- if !reflect.DeepEqual(actual, expected) {
- t.Errorf("%s = %v, want %v", message, actual, expected)
- }
- }
- func TestPushed(t *testing.T) {
- pc, err := DialDefaultServer()
- if err != nil {
- t.Fatalf("error connection to database, %v", err)
- }
- defer pc.Close()
- sc, err := DialDefaultServer()
- if err != nil {
- t.Fatalf("error connection to database, %v", err)
- }
- defer sc.Close()
- c := PubSubConn{Conn: sc}
- c.Subscribe("c1")
- expectPushed(t, c, "Subscribe(c1)", Subscription{Kind: "subscribe", Channel: "c1", Count: 1})
- c.Subscribe("c2")
- expectPushed(t, c, "Subscribe(c2)", Subscription{Kind: "subscribe", Channel: "c2", Count: 2})
- c.PSubscribe("p1")
- expectPushed(t, c, "PSubscribe(p1)", Subscription{Kind: "psubscribe", Channel: "p1", Count: 3})
- c.PSubscribe("p2")
- expectPushed(t, c, "PSubscribe(p2)", Subscription{Kind: "psubscribe", Channel: "p2", Count: 4})
- c.PUnsubscribe()
- expectPushed(t, c, "Punsubscribe(p1)", Subscription{Kind: "punsubscribe", Channel: "p1", Count: 3})
- expectPushed(t, c, "Punsubscribe()", Subscription{Kind: "punsubscribe", Channel: "p2", Count: 2})
- pc.Do("PUBLISH", "c1", "hello")
- expectPushed(t, c, "PUBLISH c1 hello", Message{Channel: "c1", Data: []byte("hello")})
- c.Ping("hello")
- expectPushed(t, c, `Ping("hello")`, Pong{"hello"})
- c.Conn.Send("PING")
- c.Conn.Flush()
- expectPushed(t, c, `Send("PING")`, Pong{})
- }
|