Skip to content

Commit

Permalink
[#12]: feat(reset): implement service resets
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Apr 14, 2022
2 parents 0258981 + 9d5d597 commit d2c00e1
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ jobs:
- name: Run linter
uses: golangci/golangci-lint-action@v3.1.0 # Action page: <https://github.com/golangci/golangci-lint-action>
with:
version: v1.45 # without patch version
version: v1.45.1 # without patch version
only-new-issues: false # show only new issues if it's a pull request
args: --timeout=10m --build-tags=race
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Documentation: <https://github.com/golangci/golangci-lint#config-file>

run:
go: '1.17'
timeout: 1m
skip-dirs:
- .github
Expand Down
88 changes: 68 additions & 20 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Plugin struct {
cfg Config

// all processes attached to the service
processes sync.Map // uuid -> *Process
processes sync.Map // key -> []*Process
}

func (p *Plugin) Init(cfg config.Configurer, log *zap.Logger) error {
Expand Down Expand Up @@ -50,43 +50,88 @@ func (p *Plugin) Serve() chan error {

for k := range p.cfg.Services {
// create needed number of the processes
procs := make([]*Process, p.cfg.Services[k].ProcessNum)

for i := 0; i < p.cfg.Services[k].ProcessNum; i++ {
// create processor structure, which will process all the services
p.processes.Store(k, NewServiceProcess(p.cfg.Services[k], p.logger))
procs[i] = NewServiceProcess(p.cfg.Services[k], p.logger)
}
}

p.processes.Range(func(key, value interface{}) bool {
proc := value.(*Process)
// store all the processes idents
p.processes.Store(k, procs)
}

err := proc.start()
if err != nil {
errCh <- err
return false
p.processes.Range(func(key, value any) bool {
procs := value.([]*Process)

for i := 0; i < len(procs); i++ {
cmdStr := procs[i].service.Command
err := procs[i].start()
if err != nil {
errCh <- err
return false
}
p.logger.Info("service have started", zap.String("name", key.(string)), zap.String("command", cmdStr))
}
p.logger.Info("service have started", zap.String("name", key.(string)), zap.String("command", proc.command.String()))

return true
})
}()

return errCh
}

func (p *Plugin) Reset() error {
p.processes.Range(func(key, value any) bool {
procs := value.([]*Process)

newProcs := make([]*Process, len(procs))

for i := 0; i < len(procs); i++ {
procs[i].stop()

service := &Service{}
*service = *(procs[i]).service

newProc := NewServiceProcess(service, p.logger)
err := newProc.start()
if err != nil {
p.logger.Error("unable to start the service", zap.String("name", key.(string)))
p.processes.Delete(key)
return true
}

newProcs[i] = newProc
procs[i].command.Stderr = nil
procs[i].command.Stdout = nil
procs[i] = nil
p.processes.Delete(key)
}

p.processes.Store(key, newProcs)
return true
})

return nil
}

func (p *Plugin) Workers() []*process.State {
p.Lock()
defer p.Unlock()
states := make([]*process.State, 0, 5)

p.processes.Range(func(key, value interface{}) bool {
k := key.(string)
proc := value.(*Process)
procs := value.([]*Process)

st, err := generalProcessState(proc.pid, proc.command.String())
if err != nil {
p.logger.Error("get process state", zap.String("name", k), zap.String("command", proc.command.String()))
return true
for i := 0; i < len(procs); i++ {
st, err := generalProcessState(procs[i].pid, procs[i].command.String())
if err != nil {
p.logger.Error("get process state", zap.String("name", k), zap.String("command", procs[i].command.String()))
return true
}
states = append(states, st)
}
states = append(states, st)

return true
})
Expand All @@ -97,12 +142,15 @@ func (p *Plugin) Workers() []*process.State {
func (p *Plugin) Stop() error {
p.processes.Range(func(key, value interface{}) bool {
k := key.(string)
proc := value.(*Process)
procs := value.([]*Process)

for i := 0; i < len(procs); i++ {
procs[i].stop()

proc.stop()
p.logger.Info("service have stopped", zap.String("name", k), zap.String("command", procs[i].service.Command))
p.processes.Delete(key)
}

p.logger.Info("service have stopped", zap.String("name", k), zap.String("command", proc.service.Command))
p.processes.Delete(key)
return true
})

Expand Down
62 changes: 35 additions & 27 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ func (r *rpc) Terminate(in *serviceV1.Service, out *serviceV1.Response) error {
return fmt.Errorf("no such service: %s", in.GetName())
}

proc := procInterface.(*Process)
proc.stop()
proc = nil
procs := procInterface.([]*Process)
for i := 0; i < len(procs); i++ {
procs[i].stop()
procs[i] = nil
}

out.Ok = true

return nil
}

Expand All @@ -86,24 +87,29 @@ func (r *rpc) Restart(in *serviceV1.Service, out *serviceV1.Response) error {
return fmt.Errorf("no such service: %s", in.GetName())
}

proc := procInterface.(*Process)
proc.stop()
procs := procInterface.([]*Process)

service := &Service{}
*service = *(proc).service
proc = nil
newProcs := make([]*Process, len(procs))
for i := 0; i < len(procs); i++ {
procs[i].stop()

newProc := NewServiceProcess(service, r.p.logger)
err := newProc.start()
if err != nil {
service := &Service{}
*service = *(procs[i]).service
procs[i] = nil

newProc := NewServiceProcess(service, r.p.logger)
err := newProc.start()
if err != nil {
r.p.processes.Delete(in.GetName())
return err
}

newProcs[i] = newProc
r.p.processes.Delete(in.GetName())
return err
}

r.p.processes.Delete(in.GetName())
r.p.processes.Store(in.GetName(), newProc)
r.p.processes.Store(in.GetName(), newProcs)
out.Ok = true

return nil
}

Expand All @@ -123,27 +129,29 @@ func (r *rpc) Status(in *serviceV1.Service, out *serviceV1.Status) error {
return fmt.Errorf("no such service: %s", in.GetName())
}

proc := procInterface.(*Process)
state, err := generalProcessState(proc.pid, proc.command.String())
if err != nil {
return err
}
procs := procInterface.([]*Process)

out.Pid = int32(state.Pid)
out.Command = state.Command
out.CPUPercent = float32(state.CPUPercent)
out.MemoryUsage = state.MemoryUsage
for i := 0; i < len(procs); i++ {
state, err := generalProcessState(procs[i].pid, procs[i].command.String())
if err != nil {
return err
}

out.Pid = int32(state.Pid)
out.Command = state.Command
out.CPUPercent = float32(state.CPUPercent)
out.MemoryUsage = state.MemoryUsage
}

return nil
}

func (r *rpc) List(_ *serviceV1.Service, out *serviceV1.List) error {
r.p.logger.Debug("services list")

r.mu.RLock()
defer r.mu.RUnlock()

r.p.processes.Range(func(key, value interface{}) bool {
r.p.logger.Debug("services list", zap.String("service", key.(string)))
out.Services = append(out.Services, key.(string))
return true
})
Expand Down

0 comments on commit d2c00e1

Please sign in to comment.