Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 60 additions & 8 deletions pkg/mqtt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package mqtt
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"math/rand"
"path"
Expand All @@ -17,19 +19,21 @@ type Client interface {
GetTopic(string) (*Topic, bool)
IsConnected() bool
Subscribe(string) *Topic
Publish(string, map[string]any, string) (json.RawMessage, error)
Unsubscribe(string)
Dispose()
}

type Options struct {
URI string `json:"uri"`
Username string `json:"username"`
Password string `json:"password"`
ClientID string `json:"clientID"`
TLSCACert string `json:"tlsCACert"`
TLSClientCert string `json:"tlsClientCert"`
TLSClientKey string `json:"tlsClientKey"`
TLSSkipVerify bool `json:"tlsSkipVerify"`
URI string `json:"uri"`
Username string `json:"username"`
Password string `json:"password"`
ClientID string `json:"clientID"`
TLSCACert string `json:"tlsCACert"`
TLSClientCert string `json:"tlsClientCert"`
TLSClientKey string `json:"tlsClientKey"`
TLSSkipVerify bool `json:"tlsSkipVerify"`
EnablePublishing bool `json:"enablePublishing"`
}

type client struct {
Expand Down Expand Up @@ -191,6 +195,54 @@ func (c *client) Unsubscribe(reqPath string) {
}
}

func (c *client) Publish(topic string, payload map[string]any, responseTopic string) (json.RawMessage, error) {
var response json.RawMessage
var err error
done := make(chan struct{}, 1)

if responseTopic != "" {
tokenSub := c.client.Subscribe(responseTopic, 2, func(c paho.Client, m paho.Message) {
response = m.Payload()
done <- struct{}{}
})

if !tokenSub.WaitTimeout(time.Second) && tokenSub.Error() != nil {
err = errors.Join(err, tokenSub.Error())
return response, err
}

defer c.client.Unsubscribe(responseTopic)
} else {
done <- struct{}{}
}

data, errMarshal := json.Marshal(&payload)
if errMarshal != nil {
err = errors.Join(err, errMarshal)
return response, err
}

token := c.client.Publish(topic, 2, false, data)

if token.Error() != nil {
err = errors.Join(err, token.Error())
return response, err
}

if !token.WaitTimeout(time.Second) {
err = errors.Join(err, errors.New("publish timeout"))
return response, err
}

select {
case <-done:
case <-time.After(time.Second):
err = errors.Join(err, errors.New("subscribe timeout"))
}

return response, err
}

func (c *client) Dispose() {
log.DefaultLogger.Info("MQTT Disconnecting")
c.client.Disconnect(250)
Expand Down
6 changes: 4 additions & 2 deletions pkg/mqtt/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ type Message struct {

// Topic represents a MQTT topic.
type Topic struct {
Path string `json:"topic"`
StreamingKey string `json:"streamingKey,omitempty"`
Path string `json:"topic"`
Payload map[string]any `json:"payload,omitempty"`
ResponsePath string `json:"response,omitempty"`
StreamingKey string `json:"streamingKey,omitempty"`
Interval time.Duration
Messages []Message
framer *framer
Expand Down
14 changes: 8 additions & 6 deletions pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@ func NewMQTTInstance(_ context.Context, s backend.DataSourceInstanceSettings) (i
return nil, err
}

return NewMQTTDatasource(client, s.UID), nil
return NewMQTTDatasource(client, s.UID, settings.EnablePublishing), nil
}

type MQTTDatasource struct {
Client mqtt.Client
channelPrefix string
Client mqtt.Client
channelPrefix string
enablePublishing bool
}

// NewMQTTDatasource creates a new datasource instance.
func NewMQTTDatasource(client mqtt.Client, uid string) *MQTTDatasource {
func NewMQTTDatasource(client mqtt.Client, uid string, enablePublishing bool) *MQTTDatasource {
return &MQTTDatasource{
Client: client,
channelPrefix: path.Join("ds", uid),
Client: client,
channelPrefix: path.Join("ds", uid),
enablePublishing: enablePublishing,
}
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/plugin/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package plugin_test

import (
"context"
"encoding/json"
"testing"

"github.com/grafana/grafana-plugin-sdk-go/backend"
Expand All @@ -15,7 +16,7 @@ func TestCheckHealthHandler(t *testing.T) {
ds := plugin.NewMQTTDatasource(&fakeMQTTClient{
connected: true,
subscribed: false,
}, "xyz")
}, "xyz", false)

res, _ := ds.CheckHealth(
context.Background(),
Expand All @@ -30,7 +31,7 @@ func TestCheckHealthHandler(t *testing.T) {
ds := plugin.NewMQTTDatasource(&fakeMQTTClient{
connected: false,
subscribed: false,
}, "xyz")
}, "xyz", false)

res, _ := ds.CheckHealth(
context.Background(),
Expand Down Expand Up @@ -60,5 +61,8 @@ func (c *fakeMQTTClient) IsSubscribed(_ string) bool {
}

func (c *fakeMQTTClient) Subscribe(_ string) *mqtt.Topic { return nil }
func (c *fakeMQTTClient) Unsubscribe(_ string) {}
func (c *fakeMQTTClient) Dispose() {}
func (c *fakeMQTTClient) Publish(string, map[string]any, string) (json.RawMessage, error) {
return json.RawMessage{}, nil
}
func (c *fakeMQTTClient) Unsubscribe(_ string) {}
func (c *fakeMQTTClient) Dispose() {}
23 changes: 17 additions & 6 deletions pkg/plugin/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,24 @@ func (ds *MQTTDatasource) query(query backend.DataQuery) backend.DataResponse {
return response
}

t.Interval = query.Interval
// Subscribe
if len(t.Payload) == 0 || !ds.enablePublishing {
t.Interval = query.Interval

frame := data.NewFrame("")
frame.SetMeta(&data.FrameMeta{
Channel: path.Join(ds.channelPrefix, t.Key()),
})
frame := data.NewFrame("")
frame.SetMeta(&data.FrameMeta{
Channel: path.Join(ds.channelPrefix, t.Key()),
})

response.Frames = append(response.Frames, frame)
response.Frames = append(response.Frames, frame)
return response
}

// Publish
resp, err := ds.Client.Publish(t.Path, t.Payload, t.ResponsePath)

field := data.NewField("Body", data.Labels{}, []json.RawMessage{resp})
response.Frames = append(response.Frames, data.NewFrame("Response", field))
response.Error = err
return response
}
11 changes: 11 additions & 0 deletions src/ConfigEditor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ export const ConfigEditor = (props: DataSourcePluginOptionsEditorProps<MqttDataS
</ConfigSection>
</>
) : null}

<Divider />

<ConfigSection title="Experimental">
<Field
label="Enable Publishing"
description="Enables publishing of MQTT messages by sending a payload object inside the query object."
>
<Switch onChange={onSwitchChanged('enablePublishing')} value={jsonData.enablePublishing || false} />
</Field>
</ConfigSection>
</>
);
};
7 changes: 1 addition & 6 deletions src/datasource.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
import {
DataQueryRequest,
DataQueryResponse,
DataSourceInstanceSettings,
ScopedVars,
} from '@grafana/data';
import { DataQueryRequest, DataQueryResponse, DataSourceInstanceSettings, ScopedVars } from '@grafana/data';
import { DataSourceWithBackend, getTemplateSrv } from '@grafana/runtime';
import { MqttDataSourceOptions, MqttQuery } from './types';
import { Observable, from, switchMap } from 'rxjs';
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface MqttDataSourceOptions extends DataSourceJsonData {
tlsAuth: boolean;
tlsAuthWithCACert: boolean;
tlsSkipVerify: boolean;
enablePublishing: boolean;
}

export interface MqttSecureJsonData {
Expand Down