Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions internal/endpoint/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,33 @@ import (

const kafkaExpiresAfter = time.Second * 30

// kafkaMaxLifetime is the maximum time a Kafka producer connection is kept
// alive before being recycled. This prevents slow memory growth in the sarama
// library's internal state (metadata cache, metrics registry, buffers) that
// accumulates in long-lived producers.
const kafkaMaxLifetime = time.Minute * 30

// KafkaConn is an endpoint connection
type KafkaConn struct {
mu sync.Mutex
ep Endpoint
conn sarama.SyncProducer
cfg *sarama.Config
ex bool
t time.Time
mu sync.Mutex
ep Endpoint
conn sarama.SyncProducer
cfg *sarama.Config
ex bool
t time.Time
createdAt time.Time
}

// Expired returns true if the connection has expired
// Expired returns true if the connection has expired due to inactivity or
// exceeding its maximum lifetime.
func (conn *KafkaConn) Expired() bool {
conn.mu.Lock()
defer conn.mu.Unlock()
if !conn.ex {
if time.Since(conn.t) > kafkaExpiresAfter {
idleExpired := time.Since(conn.t) > kafkaExpiresAfter
lifetimeExpired := !conn.createdAt.IsZero() &&
time.Since(conn.createdAt) > kafkaMaxLifetime
if idleExpired || lifetimeExpired {
conn.close()
conn.ex = true
}
Expand All @@ -55,6 +66,7 @@ func (conn *KafkaConn) close() {
conn.conn = nil
conn.cfg.MetricRegistry.UnregisterAll()
conn.cfg = nil
conn.createdAt = time.Time{}
}
}

Expand Down Expand Up @@ -170,6 +182,7 @@ func (conn *KafkaConn) Send(msg string) error {

conn.conn = c
conn.cfg = cfg
conn.createdAt = time.Now()
}

// parse json again to get out info for our kafka key
Expand Down
26 changes: 26 additions & 0 deletions internal/server/aofshrink.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/tidwall/btree"
Expand All @@ -18,6 +19,31 @@ const maxkeys = 8
const maxids = 32
const maxchunk = 4 * 1024 * 1024

func (s *Server) watchAutoAOFShrink(wg *sync.WaitGroup) {
defer wg.Done()
var lastShrink time.Time
s.loopUntilServerStops(time.Second*10, func() {
minSize := s.config.aofshrinkMinSize()
if minSize == 0 {
return
}
if time.Since(lastShrink) < time.Minute {
return
}
s.mu.RLock()
sz := s.aofsz
shrinking := s.shrinking
s.mu.RUnlock()
if shrinking || int64(sz) <= minSize {
return
}
log.Infof("auto aof shrink triggered: aof_size=%d threshold=%d",
sz, minSize)
lastShrink = time.Now()
go s.aofshrink()
})
}

func (s *Server) aofshrink() {
start := time.Now()
s.mu.Lock()
Expand Down
43 changes: 34 additions & 9 deletions internal/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ const (
AutoGC = "autogc"
KeepAlive = "keepalive"
LogConfig = "logconfig"
AnnounceIP = "replica_announce_ip"
AnnouncePort = "replica_announce_port"
AnnounceIP = "replica_announce_ip"
AnnouncePort = "replica_announce_port"
AOFShrinkMinSize = "aofshrink-min-size"
)

var validProperties = []string{RequirePass, LeaderAuth, ProtectedMode, MaxMemory, AutoGC, KeepAlive, LogConfig, ReplicaPriority, AnnouncePort, AnnounceIP}
var validProperties = []string{RequirePass, LeaderAuth, ProtectedMode, MaxMemory, AutoGC, KeepAlive, LogConfig, ReplicaPriority, AnnouncePort, AnnounceIP, AOFShrinkMinSize}

// Config is a tile38 config
type Config struct {
Expand Down Expand Up @@ -68,10 +69,12 @@ type Config struct {
_keepAlive int64
_logConfigP interface{}
_logConfig string
_announceIPP string
_announceIP string
_announcePortP string
_announcePort int64
_announceIPP string
_announceIP string
_announcePortP string
_announcePort int64
_aofshrinkMinSizeP string
_aofshrinkMinSize int64
}

func loadConfig(path string) (*Config, error) {
Expand Down Expand Up @@ -100,8 +103,9 @@ func loadConfig(path string) (*Config, error) {
_autoGCP: gjson.Get(json, AutoGC).String(),
_keepAliveP: gjson.Get(json, KeepAlive).String(),
_logConfig: gjson.Get(json, LogConfig).String(),
_announceIPP: gjson.Get(json, AnnounceIP).String(),
_announcePortP: gjson.Get(json, AnnouncePort).String(),
_announceIPP: gjson.Get(json, AnnounceIP).String(),
_announcePortP: gjson.Get(json, AnnouncePort).String(),
_aofshrinkMinSizeP: gjson.Get(json, AOFShrinkMinSize).String(),
}

if config._serverID == "" {
Expand Down Expand Up @@ -145,6 +149,9 @@ func loadConfig(path string) (*Config, error) {
if err := config.setProperty(AnnouncePort, config._announcePortP, true); err != nil {
return nil, err
}
if err := config.setProperty(AOFShrinkMinSize, config._aofshrinkMinSizeP, true); err != nil {
return nil, err
}
config.write(false)
return config, nil
}
Expand Down Expand Up @@ -184,6 +191,7 @@ func (config *Config) write(writeProperties bool) {
} else {
config._announcePortP = strconv.FormatUint(uint64(config._announcePort), 10)
}
config._aofshrinkMinSizeP = formatMemSize(config._aofshrinkMinSize)
}

m := make(map[string]interface{})
Expand Down Expand Up @@ -239,6 +247,9 @@ func (config *Config) write(writeProperties bool) {
if config._announcePortP != "" {
m[AnnouncePort] = config._announcePortP
}
if config._aofshrinkMinSizeP != "" {
m[AOFShrinkMinSize] = config._aofshrinkMinSizeP
}
data, err := json.MarshalIndent(m, "", "\t")
if err != nil {
panic(err)
Expand Down Expand Up @@ -373,6 +384,12 @@ func (config *Config) setProperty(name, value string, fromLoad bool) error {
config._announcePort = int64(announcePort)
}
}
case AOFShrinkMinSize:
sz, ok := parseMemSize(value)
if !ok {
return clientErrorf("Invalid argument '%s' for CONFIG SET '%s'", value, name)
}
config._aofshrinkMinSize = sz
}

if invalid {
Expand Down Expand Up @@ -422,6 +439,8 @@ func (config *Config) getProperty(name string) string {
return config._announceIP
case AnnouncePort:
return strconv.FormatUint(uint64(config._announcePort), 10)
case AOFShrinkMinSize:
return formatMemSize(config._aofshrinkMinSize)
}
}

Expand Down Expand Up @@ -566,6 +585,12 @@ func (config *Config) announcePort() int {
config.mu.RUnlock()
return int(v)
}
func (config *Config) aofshrinkMinSize() int64 {
config.mu.RLock()
v := config._aofshrinkMinSize
config.mu.RUnlock()
return v
}
func (config *Config) setFollowHost(v string) {
config.mu.Lock()
config._followHost = v
Expand Down
17 changes: 13 additions & 4 deletions internal/server/expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,19 @@ func (s *Server) backgroundExpireObjects(now time.Time) {
})
return true
})
isLeader := s.config.followHost() == ""
for _, msg := range msgs {
_, d, err := s.cmdDEL(msg)
if err != nil {
log.Fatal(err)
}
if err := s.writeAOF(msg.Args, &d); err != nil {
log.Fatal(err)
// Only write DEL to AOF on the leader. Followers can deterministically
// expire the same items via TTL, so replicating these deletions is
// redundant and causes unnecessary AOF growth.
if isLeader {
if err := s.writeAOF(msg.Args, &d); err != nil {
log.Fatal(err)
}
}
}
if len(msgs) > 0 {
Expand All @@ -69,13 +75,16 @@ func (s *Server) backgroundExpireHooks(now time.Time) {
return true
})

isLeader := s.config.followHost() == ""
for _, msg := range msgs {
_, d, err := s.cmdDelHook(msg)
if err != nil {
log.Fatal(err)
}
if err := s.writeAOF(msg.Args, &d); err != nil {
log.Fatal(err)
if isLeader {
if err := s.writeAOF(msg.Args, &d); err != nil {
log.Fatal(err)
}
}
}
if len(msgs) > 0 {
Expand Down
13 changes: 7 additions & 6 deletions internal/server/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,13 +635,14 @@ func (h *Hook) proc() (ok bool) {
// get keys and vals
err := tx.AscendGreaterOrEqual("hooks",
h.query, func(key, val string) bool {
if strings.HasPrefix(key, hookLogPrefix) {
// Verify this hooks name matches the one in the notif
if h.Name == gjson.Get(val, "hook").String() {
keys = append(keys, key)
vals = append(vals, val)
}
// The "hooks" index is sorted by JSON "hook" field. Once we
// encounter an entry for a different hook, all of our entries
// have been consumed and we can stop the scan.
if gjson.Get(val, "hook").String() != h.Name {
return false
}
keys = append(keys, key)
vals = append(vals, val)
return true
},
)
Expand Down
2 changes: 2 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ func Serve(opts Options) error {
bgwg.Add(1)
go s.backgroundSyncAOF(&bgwg)
bgwg.Add(1)
go s.watchAutoAOFShrink(&bgwg)
bgwg.Add(1)
go s.startPublishQueue(&bgwg)
defer func() {
log.Debug("Stopping background routines")
Expand Down