diff --git a/pkg/mqtt/client.go b/pkg/mqtt/client.go index 7aa9488..04d46d4 100644 --- a/pkg/mqtt/client.go +++ b/pkg/mqtt/client.go @@ -3,6 +3,8 @@ package mqtt import ( "crypto/tls" "crypto/x509" + "encoding/json" + "errors" "fmt" "math/rand" "path" @@ -17,19 +19,21 @@ type Client interface { GetTopic(string) (*Topic, bool) IsConnected() bool Subscribe(string) *Topic + Publish(string, map[string]any, string) (json.RawMessage, error) Unsubscribe(string) Dispose() } type Options struct { - URI string `json:"uri"` - Username string `json:"username"` - Password string `json:"password"` - ClientID string `json:"clientID"` - TLSCACert string `json:"tlsCACert"` - TLSClientCert string `json:"tlsClientCert"` - TLSClientKey string `json:"tlsClientKey"` - TLSSkipVerify bool `json:"tlsSkipVerify"` + URI string `json:"uri"` + Username string `json:"username"` + Password string `json:"password"` + ClientID string `json:"clientID"` + TLSCACert string `json:"tlsCACert"` + TLSClientCert string `json:"tlsClientCert"` + TLSClientKey string `json:"tlsClientKey"` + TLSSkipVerify bool `json:"tlsSkipVerify"` + EnablePublishing bool `json:"enablePublishing"` } type client struct { @@ -191,6 +195,54 @@ func (c *client) Unsubscribe(reqPath string) { } } +func (c *client) Publish(topic string, payload map[string]any, responseTopic string) (json.RawMessage, error) { + var response json.RawMessage + var err error + done := make(chan struct{}, 1) + + if responseTopic != "" { + tokenSub := c.client.Subscribe(responseTopic, 2, func(c paho.Client, m paho.Message) { + response = m.Payload() + done <- struct{}{} + }) + + if !tokenSub.WaitTimeout(time.Second) && tokenSub.Error() != nil { + err = errors.Join(err, tokenSub.Error()) + return response, err + } + + defer c.client.Unsubscribe(responseTopic) + } else { + done <- struct{}{} + } + + data, errMarshal := json.Marshal(&payload) + if errMarshal != nil { + err = errors.Join(err, errMarshal) + return response, err + } + + token := c.client.Publish(topic, 2, false, data) + + if token.Error() != nil { + err = errors.Join(err, token.Error()) + return response, err + } + + if !token.WaitTimeout(time.Second) { + err = errors.Join(err, errors.New("publish timeout")) + return response, err + } + + select { + case <-done: + case <-time.After(time.Second): + err = errors.Join(err, errors.New("subscribe timeout")) + } + + return response, err +} + func (c *client) Dispose() { log.DefaultLogger.Info("MQTT Disconnecting") c.client.Disconnect(250) diff --git a/pkg/mqtt/topic.go b/pkg/mqtt/topic.go index 029c4b1..511efd6 100644 --- a/pkg/mqtt/topic.go +++ b/pkg/mqtt/topic.go @@ -18,8 +18,10 @@ type Message struct { // Topic represents a MQTT topic. type Topic struct { - Path string `json:"topic"` - StreamingKey string `json:"streamingKey,omitempty"` + Path string `json:"topic"` + Payload map[string]any `json:"payload,omitempty"` + ResponsePath string `json:"response,omitempty"` + StreamingKey string `json:"streamingKey,omitempty"` Interval time.Duration Messages []Message framer *framer diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index 83de2a4..e19b9d4 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -32,19 +32,21 @@ func NewMQTTInstance(_ context.Context, s backend.DataSourceInstanceSettings) (i return nil, err } - return NewMQTTDatasource(client, s.UID), nil + return NewMQTTDatasource(client, s.UID, settings.EnablePublishing), nil } type MQTTDatasource struct { - Client mqtt.Client - channelPrefix string + Client mqtt.Client + channelPrefix string + enablePublishing bool } // NewMQTTDatasource creates a new datasource instance. -func NewMQTTDatasource(client mqtt.Client, uid string) *MQTTDatasource { +func NewMQTTDatasource(client mqtt.Client, uid string, enablePublishing bool) *MQTTDatasource { return &MQTTDatasource{ - Client: client, - channelPrefix: path.Join("ds", uid), + Client: client, + channelPrefix: path.Join("ds", uid), + enablePublishing: enablePublishing, } } diff --git a/pkg/plugin/datasource_test.go b/pkg/plugin/datasource_test.go index 96f8b65..7da5db9 100644 --- a/pkg/plugin/datasource_test.go +++ b/pkg/plugin/datasource_test.go @@ -2,6 +2,7 @@ package plugin_test import ( "context" + "encoding/json" "testing" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -15,7 +16,7 @@ func TestCheckHealthHandler(t *testing.T) { ds := plugin.NewMQTTDatasource(&fakeMQTTClient{ connected: true, subscribed: false, - }, "xyz") + }, "xyz", false) res, _ := ds.CheckHealth( context.Background(), @@ -30,7 +31,7 @@ func TestCheckHealthHandler(t *testing.T) { ds := plugin.NewMQTTDatasource(&fakeMQTTClient{ connected: false, subscribed: false, - }, "xyz") + }, "xyz", false) res, _ := ds.CheckHealth( context.Background(), @@ -60,5 +61,8 @@ func (c *fakeMQTTClient) IsSubscribed(_ string) bool { } func (c *fakeMQTTClient) Subscribe(_ string) *mqtt.Topic { return nil } -func (c *fakeMQTTClient) Unsubscribe(_ string) {} -func (c *fakeMQTTClient) Dispose() {} +func (c *fakeMQTTClient) Publish(string, map[string]any, string) (json.RawMessage, error) { + return json.RawMessage{}, nil +} +func (c *fakeMQTTClient) Unsubscribe(_ string) {} +func (c *fakeMQTTClient) Dispose() {} diff --git a/pkg/plugin/query.go b/pkg/plugin/query.go index 9243b6a..750d2c4 100644 --- a/pkg/plugin/query.go +++ b/pkg/plugin/query.go @@ -39,13 +39,24 @@ func (ds *MQTTDatasource) query(query backend.DataQuery) backend.DataResponse { return response } - t.Interval = query.Interval + // Subscribe + if len(t.Payload) == 0 || !ds.enablePublishing { + t.Interval = query.Interval - frame := data.NewFrame("") - frame.SetMeta(&data.FrameMeta{ - Channel: path.Join(ds.channelPrefix, t.Key()), - }) + frame := data.NewFrame("") + frame.SetMeta(&data.FrameMeta{ + Channel: path.Join(ds.channelPrefix, t.Key()), + }) - response.Frames = append(response.Frames, frame) + response.Frames = append(response.Frames, frame) + return response + } + + // Publish + resp, err := ds.Client.Publish(t.Path, t.Payload, t.ResponsePath) + + field := data.NewField("Body", data.Labels{}, []json.RawMessage{resp}) + response.Frames = append(response.Frames, data.NewFrame("Response", field)) + response.Error = err return response } diff --git a/src/ConfigEditor.tsx b/src/ConfigEditor.tsx index 9984aab..b0d63f2 100644 --- a/src/ConfigEditor.tsx +++ b/src/ConfigEditor.tsx @@ -119,6 +119,17 @@ export const ConfigEditor = (props: DataSourcePluginOptionsEditorProps ) : null} + + + + + + + + ); }; diff --git a/src/datasource.ts b/src/datasource.ts index 887d642..277df34 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -1,9 +1,4 @@ -import { - DataQueryRequest, - DataQueryResponse, - DataSourceInstanceSettings, - ScopedVars, -} from '@grafana/data'; +import { DataQueryRequest, DataQueryResponse, DataSourceInstanceSettings, ScopedVars } from '@grafana/data'; import { DataSourceWithBackend, getTemplateSrv } from '@grafana/runtime'; import { MqttDataSourceOptions, MqttQuery } from './types'; import { Observable, from, switchMap } from 'rxjs'; diff --git a/src/types.ts b/src/types.ts index fc3113d..816384b 100644 --- a/src/types.ts +++ b/src/types.ts @@ -14,6 +14,7 @@ export interface MqttDataSourceOptions extends DataSourceJsonData { tlsAuth: boolean; tlsAuthWithCACert: boolean; tlsSkipVerify: boolean; + enablePublishing: boolean; } export interface MqttSecureJsonData {