From 3a8826a7fb29dc8c4f28b6488de72b1afdc8a427 Mon Sep 17 00:00:00 2001 From: Lieven Hollevoet Date: Sun, 15 Mar 2015 20:14:30 +0100 Subject: [PATCH 1/4] Initial import of mqqt code from Neil that was submitted on the mailing list --- lib/mqtt.pm | 517 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 517 insertions(+) create mode 100644 lib/mqtt.pm diff --git a/lib/mqtt.pm b/lib/mqtt.pm new file mode 100644 index 000000000..52b3d7f6e --- /dev/null +++ b/lib/mqtt.pm @@ -0,0 +1,517 @@ +# ------------------------------------------------------------------------------ +=begin comment +@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ + +File: + MQTT.pm + +Description: + This is the base interface class for Message Queue Telemetry Transport + (MQTT) + + For more information about the MQTT protocol: + http://mqtt.org + +Author(s): + Neil Cherry + + Based loosely on the UPMPIM.pm code + - Jason Sharpee + +License: + This free software is licensed under the terms of the GNU public license. + +Usage: + Use these mh.ini parameters to enable this code: + + mqtt_host=test.mosquitto.org + mqtt_server_port=1883 + mqtt_port=1883 + mqtt_topic=home/# + mqtt_user=user (optional) + mqtt_password=password (optional) + mqtt_keepalive=120 (optional) + +Notes: + - + +Special Thanks to: + Bruce Winter - MH + Jason Sharpee - MH UPB pkg + +@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ + +=head1 B + +Neil Cherry + +=head2 SYNOPSIS + +NONE + +=head2 DESCRIPTION + +Misterhouse MQTT interface for use with any MQTT service + +MQTT website: http://mqtt.org/ + +Notes: + +Need: +MQTT Device + test.mosquitto.org + 1883 +or + mozart.uucp + 1883 + +The test.mosquitto.org server listens on ports 1883, 8883, 8884 and +8885. Port 1883 is the standard unencrypted MQTT port and can be used +with any MQTT client. Ports 8883 and 8884 use certificate based +SSL/TLS encryption (TLSv1.2, TLSv1.1 or TLSv1.0) and require client +support to connect. + +Web sockets are not supported here in MH + +MQTT Item Pub (WO) + Device + 'topic' + +MQTT Item Sub (RO) + Device + 'topic' + +Topics (examples) + # + /ha/# + /ha/house/livingroom/lamp/ + /ha/weather/temp + /ha/weather/windspeed/ + +For now we'll use the wildcard. I don't recommend using the '#' if the +MQTT is test.mqtt.org Rather pick something a bit more unique like + /username/ha/# +and play from there + +Because of the wildcard it probably makes sense to support multiple mqtt +connections. This would allow for: + + home/ha/x10/# + home/ha/z-wave/# + home/ha/zigbee/# + +instead of: + + # + +Which could cover things like this: + + home/email/... + home/statistics/... + offsite/... + +If you're using a home mqtt then this might not be such as issue. + +The intial device needs some kind of way to tell that it's still connected to +the MQTT server (MQTT Ping comes to mind). + +=head2 INHERITS + +B + +=head2 METHODS + +=over + +=item B + +=cut +# ------------------------------------------------------------------------------ +package mqtt; + +@mqtt::ISA = ('Generic_Item'); + +use strict; +use warnings; + +use Net::MQTT::Constants qw/:all/; +use Net::MQTT::Message; + +use IO::Select; +use IO::Socket::INET; + +use Time::HiRes; + +use Data::Dumper; + +eval "use bytes"; # Not on all installs, so eval to avoid errors + +# Need to share this with the outside world +my $verbose = 5; +my $buf = ''; +my $got_ping_response = 1; +my $next_ping; + +my $keep_alive_timer = 120; + +my $socket; + +my $msg_id = 1; + +my %MQTT_Data; + +# Configuration variables +my $started; # True if running already + +# Configuration variables + +# +# mqtt_server_port = +# loads mqtt.pm +# calls ${module}::${type}_startup($instance) +# mqtt::server_startup(mqtt) +# +=item C + +Called by the MH main script as a result of defining a server port in the ini file. + +=cut +sub server_startup { + my ($instance) = @_; # mqtt, mqtt_1, etc. + + # Don't start this instance twice + + # + my $host = "127.0.0.1"; + my $port = 8883; + my $topic = "home/ha/#"; + + &main::print_log("*** MQTT Instance $instance"."_host"); + ### + ### For now we'll only worry about 1 instance + ### + $host = $::config_parms{$instance . "_host"}; + $port = $::config_parms{$instance . "_server_port"}; # Yes a bit weird + $topic = $::config_parms{$instance . "_topic"}; + $topic =~ s/"//g; + + $keep_alive_timer = $::config_parms{$instance . "_keepalive"}; + + &main::print_log("*** Opening MQTT ($instance) connection to $host/$port/$topic"); + &main::print_log("*** $host"); + &main::print_log("*** $port"); + &main::print_log("*** $topic"); + &main::print_log("*** $keep_alive_timer"); + + ### ------------------------------------------------------------------------ + ### 1) open a socket (host, port and keepalive + $socket = IO::Socket::INET->new(PeerAddr => $host . ':' . $port, + Timeout => 240, ); + + return if(!defined($socket)); + $MQTT_Data{$instance}{'socket'} = $socket; + $MQTT_Data{$instance}{'got_ping_response'} = 1; # Why 1? + $MQTT_Data{$instance}{'next_ping'} = 0; # + + # -------------------------------------------------------------------------- + # We're good to here (socket is connected) + + ### 2) Send MQTT_CONNECT + send_mqtt_msg($socket, message_type => MQTT_CONNECT, keep_alive_timer => $keep_alive_timer); + + # Bur when we get here, poof, socket is closed + ### 3) Check for ACK or fail + + &main::print_log("*** NJC Socket check ($keep_alive_timer) [$!]: " . ($socket->connected ? "Connected" : "Failed")); + my $msg = read_mqtt_msg_timeout($socket, $buf); + if(!$msg) { + &main::print_log ("XXX NJC No ConnAck "); + exit 1; + return; + } + + &main::print_log ("*** NJC Received: " . $msg->string . "\n") if ($verbose >= 2); + + ### 4) Send a subscribe '#' (we'll have many of these, one for each device) + ### I don't know if this is a good idea or not but that's what I intend to do for now + send_mqtt_msg($socket, message_type => MQTT_SUBSCRIBE, + message_id => $msg_id++, + topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } $topic ]); + + ### 5) Check for ACK or fail$msg = read_mqtt_msg($socket, $buf) or die "No SubAck\n"; + print 'Received: ', $msg->string, "\n" if ($verbose >= 2); + + ### 6) check for data + &main::print_log ("*** NJC Initializing MQTT connection ..."); + + #return ; + + if (1 == scalar(keys %MQTT_Data)) { # Add hooks on first call only + &main::print_log ("*** NJC added MQTT poll ..."); + &::MainLoop_pre_add_hook(\&mqtt::check_for_data, 1); + } else { + &main::print_log ("*** NJC already added MQTT poll ..." . scalar(keys %MQTT_Data) ); + &main::print_log ("*** NJC already added MQTT poll ... but that's okay" ); + #exit 1; + } + ### 6a) publish a hello to our initial subscription + ### 6b) check for ping response (hmmm, need to thing about this) +} +# ------------------------------------------------------------------------------ +# Handle device I/O: Read and write messages on the bus +=item C + +Called at the start of every loop. This checks either the serial or server port +for new data. If data is found, the data is broken down into individual +messages and sent to C to be parsed. The message is then +compared to the previous data received if this is a duplicate message it is +logged and ignored. If this is a new message it is sent to C. + +03/13/15 11:15:14 AM *** NJC read_mqtt_msg Receive buffer: + 30 1d 00 0c 68 6f 6d 65 2f 68 61 2f 74 65 73 74 0...home/ha/test + 64 65 70 72 65 63 61 74 65 64 20 61 6c 73 6f deprecated also +03/13/15 11:15:14 AM *** NJC rcv'd: Publish/at-most-once home/ha/test + 64 65 70 72 65 63 61 74 65 64 20 61 6c 73 6f deprecated also + +We see this every $keep_alive_timeout seconds +03/13/15 11:17:08 AM *** NJC read_mqtt_msg Receive buffer: + d0 00 .. +03/13/15 11:17:08 AM *** NJC Received: PingResp/at-most-once + +=cut +my @outqueue = (); # Queue of messages to be sent +my $count = 0; # Number of passes since last message sent + +sub check_for_data { + foreach my $inst (keys %MQTT_Data) { + my $socket = $MQTT_Data{$inst}{'socket'}; + my $self = $MQTT_Data{$inst}{self}; + ### MQTT stuff below + + # This one doesn't block + my $msg = read_mqtt_msg($socket, $buf); + + if ($msg) { + ### + ### Okay this is the hard part + ### For now I'm only worried about data that fits into 1 read + ### + ### I've got a message, I think it has + ### $msg->topic + ### $msg->message + ### I need to map this into the user device + ### the top is the address, the message the value + ### + if ($msg->message_type == MQTT_PUBLISH) { + &main::print_log ("*** NJC check_for_data R: Dumper: " . Dumper($msg)); + ### + ### Someone published something, deal with it + ### + if ($verbose == 0) { + &main::print_log ("*** NJC check_for_data rcv'd: T:" . $msg->topic, ", M:", $msg->message); + } else { + &main::print_log ("*** NJC check_for_data rcv'd: T:" . $msg->topic, ", M:", $msg->message); + &main::print_log ("*** NJC check_for_data rcv'd: S:" . $msg->string . ","); + } + + #main::print_log("$port_name got: [$::Serial_Ports{$port_name}{data_record}]"); + #$MQTT_Data{$instance}{'obj'}->_parse_data_to_obj($msg); + #$self->parse_data_to_obj($msg); + + ### + ### So we want the topic and the message + ### topic = address or identifier of the device + ### message = is the value of the device + ### + ### For now let's not get too fancy + ### $self->{$msg->topic}->{message} = $msg->message + ### $self->{$msg->topic}->{timestamp} = timestamp + ### + } elsif ($msg->message_type == MQTT_PINGRESP) { + $got_ping_response = 1; + &main::print_log ('*** NJC check_for_data Ping rcvd: ' . $msg->string) if ($verbose >= 3); + } else { + &main::print_log ("*** NJC check_for_data Received: $msg->string") if ($verbose >= 2); + } + } + + # This is where we need to check for outgoing + # Check for output + if ($#outqueue >= 0) { + my $mref = shift @outqueue; + + ### + ### Okay this is the other hard part + ### + #send_mqtt_msg($mref)); + &main::print_log ("*** NJC check_for_data send_mqtt_msg $mref "); + } + + } # End of foreach $socket + + ### + ### We need to deal with the pint for each socket, not 1 ping for all + ### + # Ping check + if (Time::HiRes::time > $next_ping) { + &main::print_log ("*** NJC read_mqtt_msg Ping Response timeout.") unless ($got_ping_response); + return unless ($got_ping_response); + send_mqtt_msg($socket, message_type => MQTT_PINGREQ); + } +} +# ------------------------------------------------------------------------------ +=item C<()> +=cut +sub send_mqtt_msg { + my $socket = shift; + + my $msg = Net::MQTT::Message->new(@_); + $msg = $msg->bytes; + + # syswrite ? + syswrite $socket, $msg, length $msg; + $next_ping = Time::HiRes::time + $keep_alive_timer; +} +# ------------------------------------------------------------------------------ +=item C +=cut +sub read_mqtt_msg { + my $socket = shift; + my $select = IO::Select->new($socket); + my $timeout = $next_ping - Time::HiRes::time; + + do { + ### + ### I really need to sit down and figure this out + ### + my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1); + + #&main::print_log ("*** NJC read_mqtt_msg n " . $_[0]); + + #print "0"; + #&main::print_log ("*** NJC read_mqtt_msg 0"); + return $mqtt if (defined $mqtt); + + #print "1"; + #&main::print_log ("*** NJC read_mqtt_msg 1"); + ### very short wait + ### Return if there is no data + $select->can_read(0.1) || return; + + #print "2"; + #&main::print_log ("*** NJC read_mqtt_msg 2"); + $timeout = $next_ping - Time::HiRes::time; + + #print "3"; + #&main::print_log ("*** NJC read_mqtt_msg 3"); + # Sysread ? sysread FILEHANDLE,SCALAR,LENGTH,OFFSET + my $bytes = sysread $socket, $_[0], 2048, length $_[0]; + + #print "4"; + #&main::print_log ("*** NJC read_mqtt_msg 4"); + + unless ($bytes) { + &main::print_log ("*** NJC read_mqtt_msg Socket closed " . (defined $bytes ? 'gracefully' : 'error')); + return; + } + + #&main::print_log ("*** NJC read_mqtt_msg Receive buffer: " . dump_string($_[0])) if ($verbose >= 3); + } while ($timeout > 0); + + #&main::print_log ("*** NJC read_mqtt_msg 5"); + return; +} + +# ------------------------------------------------------------------------------ +=item C +=cut +sub read_mqtt_msg_timeout { + my $socket = shift; + &main::print_log ("*** NJC read_mqtt_msg_timeout Socket " . ($socket->connected ? "Connected" : "Failed")); + my $select = IO::Select->new($socket); + my $timeout = $next_ping - Time::HiRes::time; + + do { + my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1); + + #&main::print_log ("*** NJC read_mqtt_msg n " . $_[0]); + + #print "0"; + #&main::print_log ("*** NJC read_mqtt_msg 0"); + return $mqtt if (defined $mqtt); + + #print "1"; + &main::print_log ("*** NJC read_mqtt_msg 1 ($timeout)"); + ### This is where it waits (blocking) + ### + $select->can_read($timeout) || return; + + #print "2"; + #&main::print_log ("*** NJC read_mqtt_msg 2"); + $timeout = $next_ping - Time::HiRes::time; + + #print "3"; + #&main::print_log ("*** NJC read_mqtt_msg 3"); + # Sysread ? sysread FILEHANDLE,SCALAR,LENGTH,OFFSET + my $bytes = sysread $socket, $_[0], 2048, length $_[0]; + + &main::print_log ("*** NJC sysread Socket " . ($socket->connected ? "Connected" : "Failed")); + + #print "4"; + #&main::print_log ("*** NJC read_mqtt_msg 4"); + + unless ($bytes) { + &main::print_log ("*** NJC read_mqtt_msg Socket closed " . (defined $bytes ? 'gracefully' : 'error') . " ($timeout)($!)"); + return; + } + + &main::print_log ("*** NJC read_mqtt_msg Receive buffer: " . dump_string($_[0])) if ($verbose >= 3); + } while ($timeout > 0); + + #&main::print_log ("*** NJC read_mqtt_msg 5"); + return; +} +# ------------------------------------------------------------------------------ +=item C +=cut +sub set { + my ($self, $state, $set_by) = @_; + +} +# ------------------------------------------------------------------------------ +=begin comment +=item C + +Used to send commands to the interface. +=cut +sub new { + bless $self, $class; + + return $self; +} +# ------------------------------------------------------------------------------ +=item C +Take the data and parse it to the MH obj() + +$msg = bless( { 'topic' => 'home/ha/test/x10/A1', + 'remaining' => '', + 'retain' => 1, + 'dup' => 0, + 'message_type' => 3, + 'qos' => 0, + 'message' => 'A1AON' + }, + 'Net::MQTT::Message::Publish' +); + +=cut +sub parse_data_to_obj { + my ($self, $msg) = @_; + + for my $obj (@{$$self{objects}}) { + $obj->set($msg,$self); + } +} +# -[ Fini ]--------------------------------------------------------------------- + +return 1; From d0e53c72f57654fe1c120c2fc0b2bf08a422ad70 Mon Sep 17 00:00:00 2001 From: Lieven Hollevoet Date: Sat, 21 Mar 2015 16:48:27 +0100 Subject: [PATCH 2/4] Update received from Neil. Further evolution of the code. Consider this to be first working version. --- lib/mqtt.pm | 793 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 531 insertions(+), 262 deletions(-) diff --git a/lib/mqtt.pm b/lib/mqtt.pm index 52b3d7f6e..6ec77e3b2 100644 --- a/lib/mqtt.pm +++ b/lib/mqtt.pm @@ -1,9 +1,9 @@ # ------------------------------------------------------------------------------ =begin comment -@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ + @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ File: - MQTT.pm + mqtt.pm Description: This is the base interface class for Message Queue Telemetry Transport @@ -12,121 +12,133 @@ Description: For more information about the MQTT protocol: http://mqtt.org -Author(s): + Author(s): Neil Cherry - Based loosely on the UPMPIM.pm code - - Jason Sharpee + Based loosely on the UPMPIM.pm and SqueezeCLI.pm code + - Jason Sharpee (UPB) + - (SqueezeCLI) License: This free software is licensed under the terms of the GNU public license. Usage: - Use these mh.ini parameters to enable this code: - mqtt_host=test.mosquitto.org - mqtt_server_port=1883 - mqtt_port=1883 - mqtt_topic=home/# - mqtt_user=user (optional) - mqtt_password=password (optional) - mqtt_keepalive=120 (optional) + .mht file: + + # MQTT stuff + CODE, require mqtt; #noloop + CODE, $mqtt_1 = new mqtt('mqtt_1', '127.0.0.1', 1883, 'home/ha/#', 121); #noloop + #CODE, $mqtt_2 = new mqtt('mqtt_2', 'test.mosquitto.org', 1883, 'home/test/#', 122); #noloop + #CODE, $mqtt_3 = new mqtt('mqtt_3', '127.0.0.1', 1883, 'home/network/#', 122); #noloop + CODE, $CR_Temp = new mqtt_Item($mqtt_1, "home/ha/text/x10/1"); #noloop + + CODE, $CR_Temp->set("Off"); + + CLI generation of a command to the CR_Temp + + mosquitto_pub -d -h 127.0.0.1 -q 0 -t home/ha/text/x10/1 -m "Off" + +Example initialization: + + $myMQTT = new mqtt("MQTT",,,,,,); Notes: - -Special Thanks to: + Special Thanks to: Bruce Winter - MH Jason Sharpee - MH UPB pkg -@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ + @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ -=head1 B + =head1 B -Neil Cherry + Neil Cherry -=head2 SYNOPSIS + =head2 SYNOPSIS -NONE + NONE -=head2 DESCRIPTION + =head2 DESCRIPTION -Misterhouse MQTT interface for use with any MQTT service + Misterhouse MQTT interface for use with any MQTT service -MQTT website: http://mqtt.org/ + MQTT website: http://mqtt.org/ Notes: Need: -MQTT Device + MQTT Device test.mosquitto.org 1883 -or + or mozart.uucp 1883 -The test.mosquitto.org server listens on ports 1883, 8883, 8884 and -8885. Port 1883 is the standard unencrypted MQTT port and can be used -with any MQTT client. Ports 8883 and 8884 use certificate based -SSL/TLS encryption (TLSv1.2, TLSv1.1 or TLSv1.0) and require client -support to connect. + The test.mosquitto.org server listens on ports 1883, 8883, 8884 and + 8885. Port 1883 is the standard unencrypted MQTT port and can be used + with any MQTT client. Ports 8883 and 8884 use certificate based + SSL/TLS encryption (TLSv1.2, TLSv1.1 or TLSv1.0) and require client + support to connect. -Web sockets are not supported here in MH + Web sockets are not supported here in MH -MQTT Item Pub (WO) + MQTT Item Pub (WO) Device 'topic' -MQTT Item Sub (RO) + MQTT Item Sub (RO) Device 'topic' -Topics (examples) + Topics (examples) # /ha/# /ha/house/livingroom/lamp/ /ha/weather/temp /ha/weather/windspeed/ -For now we'll use the wildcard. I don't recommend using the '#' if the -MQTT is test.mqtt.org Rather pick something a bit more unique like - /username/ha/# -and play from there + For now we'll use the wildcard. I'll think about a rewrite later without the + wild card support. I don't recommend using the '#' if the MQTT is test.mqtt.org + Rather pick something a bit more unique like /username/ha/# and play from there -Because of the wildcard it probably makes sense to support multiple mqtt -connections. This would allow for: + Because of the wildcard it probably makes sense to support multiple mqtt + connections. This would allow for: home/ha/x10/# home/ha/z-wave/# home/ha/zigbee/# -instead of: + instead of: # -Which could cover things like this: + Which could cover things like this: home/email/... home/statistics/... offsite/... -If you're using a home mqtt then this might not be such as issue. + If you're using a home mqtt then this might not be such as issue. -The intial device needs some kind of way to tell that it's still connected to -the MQTT server (MQTT Ping comes to mind). + The intial device needs some kind of way to tell that it's still connected to + the MQTT server (MQTT Ping comes to mind). -=head2 INHERITS + =head2 INHERITS -B + B -=head2 METHODS + =head2 METHODS -=over + =over -=item B + =item B =cut # ------------------------------------------------------------------------------ +use constant { TRUE => 1, FALSE => 0 }; + package mqtt; @mqtt::ISA = ('Generic_Item'); @@ -142,154 +154,199 @@ use IO::Socket::INET; use Time::HiRes; +#use JSON qw( decode_json ); # + use Data::Dumper; eval "use bytes"; # Not on all installs, so eval to avoid errors # Need to share this with the outside world -my $verbose = 5; -my $buf = ''; -my $got_ping_response = 1; -my $next_ping; - -my $keep_alive_timer = 120; - -my $socket; - -my $msg_id = 1; +my $verbose = 5; +my $buf = ''; +my $msg_id = 1; my %MQTT_Data; -# Configuration variables -my $started; # True if running already - -# Configuration variables - -# -# mqtt_server_port = -# loads mqtt.pm -# calls ${module}::${type}_startup($instance) -# mqtt::server_startup(mqtt) -# -=item C - -Called by the MH main script as a result of defining a server port in the ini file. +# ------------------------------------------------------------------------------ +sub dump() { + &main::print_log("*** mqtt Dumper (MQTT_Data):\n" . Dumper(\%MQTT_Data) . "***"); +} +# ------------------------------------------------------------------------------ +=item =cut -sub server_startup { - my ($instance) = @_; # mqtt, mqtt_1, etc. - - # Don't start this instance twice - - # - my $host = "127.0.0.1"; - my $port = 8883; - my $topic = "home/ha/#"; +sub mqtt_connect() { + my ($self) = @_; - &main::print_log("*** MQTT Instance $instance"."_host"); - ### - ### For now we'll only worry about 1 instance - ### - $host = $::config_parms{$instance . "_host"}; - $port = $::config_parms{$instance . "_server_port"}; # Yes a bit weird - $topic = $::config_parms{$instance . "_topic"}; - $topic =~ s/"//g; - - $keep_alive_timer = $::config_parms{$instance . "_keepalive"}; - - &main::print_log("*** Opening MQTT ($instance) connection to $host/$port/$topic"); - &main::print_log("*** $host"); - &main::print_log("*** $port"); - &main::print_log("*** $topic"); - &main::print_log("*** $keep_alive_timer"); - - ### ------------------------------------------------------------------------ ### 1) open a socket (host, port and keepalive - $socket = IO::Socket::INET->new(PeerAddr => $host . ':' . $port, - Timeout => 240, ); + my $socket = IO::Socket::INET->new(PeerAddr => $self->{host} . ':' . $self->{port}, + Timeout => $self->{keep_alive_timer}, ); + + # Can't use this at this time + # $socket = new main::Socket_Item(undef, undef, "$host:$port", $instance); return if(!defined($socket)); - $MQTT_Data{$instance}{'socket'} = $socket; - $MQTT_Data{$instance}{'got_ping_response'} = 1; # Why 1? - $MQTT_Data{$instance}{'next_ping'} = 0; # - # -------------------------------------------------------------------------- - # We're good to here (socket is connected) + $self->{socket} = $socket; + $self->{got_ping_response} = 1; + $self->{next_ping} = $self->{keep_alive_timer}; + # -------------------------------------------------------------------------- ### 2) Send MQTT_CONNECT - send_mqtt_msg($socket, message_type => MQTT_CONNECT, keep_alive_timer => $keep_alive_timer); + $self->send_mqtt_msg(message_type => MQTT_CONNECT, keep_alive_timer => $self->{keep_alive_timer}); - # Bur when we get here, poof, socket is closed ### 3) Check for ACK or fail + &main::print_log("*** mqtt Socket check ($$self{keep_alive_timer}) [ $! ]: " . ($self->isConnected() ? "Connected" : "Failed")) if($main::Debug{mqtt}); - &main::print_log("*** NJC Socket check ($keep_alive_timer) [$!]: " . ($socket->connected ? "Connected" : "Failed")); - my $msg = read_mqtt_msg_timeout($socket, $buf); + my $msg = read_mqtt_msg_timeout($self, $buf); if(!$msg) { - &main::print_log ("XXX NJC No ConnAck "); + &main::print_log ("XXX mqtt No ConnAck "); exit 1; return; } - &main::print_log ("*** NJC Received: " . $msg->string . "\n") if ($verbose >= 2); + # We should actually get a SubAck but who is checking (yes, I know I should) + &main::print_log ("*** mqtt Received: " . $msg->string) if ($main::Debug{mqtt}); ### 4) Send a subscribe '#' (we'll have many of these, one for each device) ### I don't know if this is a good idea or not but that's what I intend to do for now - send_mqtt_msg($socket, message_type => MQTT_SUBSCRIBE, - message_id => $msg_id++, - topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } $topic ]); + $self->send_mqtt_msg(message_type => MQTT_SUBSCRIBE, + message_id => $msg_id++, + topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } $self->{topic} ]); - ### 5) Check for ACK or fail$msg = read_mqtt_msg($socket, $buf) or die "No SubAck\n"; - print 'Received: ', $msg->string, "\n" if ($verbose >= 2); + ### 5) Check for ACK or fail + $msg = $self->read_mqtt_msg($buf) or &main::print_log ("*** mqtt Received: ". "No SubAck"); + &main::print_log ("*** mqtt $$self{instance} Sub 1 Received: " . "$$msg{string}") if($main::Debug{mqtt}); ### 6) check for data - &main::print_log ("*** NJC Initializing MQTT connection ..."); + &main::print_log ("*** mqtt Initializing MQTT connection ...") if($main::Debug{mqtt}); +} +# ------------------------------------------------------------------------------ +=item C +=cut +sub isConnected { + my ($self) = @_; + + return $$self{socket}->connected; +} +# ------------------------------------------------------------------------------ +=item C + + Used to send commands to the interface. + +=cut +sub new { + my ($class, $instance, $host, $port, $topic, $keep_alive_timer) = @_; + + my $self = {}; + + # If we have already created a socket and have an existing instance then + # return the existing instance. MQTT doesn't like having 2 sockets to the + # same server and will close the old socket. + # But what should I do about the new topic. I'll need to subscribe to the + # topic before returning the existing instance + foreach my $inst (keys %MQTT_Data) { + if("$MQTT_Data{$inst}{self}{host}" eq "$host") { + if("$MQTT_Data{$inst}{self}{port}" eq "$port") { + # subscribe to the topic if it doesn't already exist + if("$MQTT_Data{$inst}{self}{topic}" ne "$topic") { + # Old, existing instace with the same host and port info + $self = $MQTT_Data{$inst}{self}; + + # Subscribe to the new topic + send_mqtt_msg($self, message_type => MQTT_SUBSCRIBE, + message_id => $msg_id++, + topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } $topic ]); + + ### 5) Check for ACK or fail + $buf = ''; + my $msg = read_mqtt_msg($self, $buf) or &main::print_log ("*** mqtt Received: ". "No SubAck"); + &main::print_log ("*** mqtt $inst Sub 2 Received: " . $msg->string) if($main::Debug{mqtt}); + } + + # This is the little messages that appear when MH starts + &main::print_log ("*** Reusing $inst (instead of $instance) on $host:$port $topic"); + + return $MQTT_Data{$inst}{self}; + } + } + } + + # This is the little messages that appear when MH starts + &main::print_log ("*** Creating $instance on $host:$port $topic");; + + $$self{state} = ''; + $$self{said} = ''; + $$self{state_now} = ''; - #return ; + @{$$self{command_stack}} = (); + + $$self{instance} = $instance; + + $$self{host} = $host || "127.0.0.1"; + $$self{port} = $port || 1883; + # Use the wildcard here, not in the mqtt_Item + $$self{topic} = "$topic" || "home/ha/#"; + # Currently not used + #$$self{user} = $user || "guest"; + # Currently not used + #$$self{password} = $password || "guest"; + $$self{keep_alive_timer} = $keep_alive_timer || 120; + + # + $$self{next_ping} = 0; + $$self{got_ping_response} = 1; # We really don't use this (yet) + + bless $self, $class; + + $MQTT_Data{$instance}{self} = $self; + + if($main::Debug{mqtt}) { + &main::print_log("*** Opening MQTT ($instance) connection to $$self{host}/$$self{port}/$$self{topic}"); + &main::print_log("*** Host = $$self{host}"); + &main::print_log("*** Port = $$self{port}"); + &main::print_log("*** Topic = $$self{topic}"); + &main::print_log("*** Keep Alive = $$self{keep_alive_timer}"); + } + ### ------------------------------------------------------------------------ + $self->mqtt_connect(); + + &main::print_log("\n***\n*** Hmm, this is not good!, can't find myself\n***\n") unless $self; + return unless $self; + + # Hey what happens when we fail ? + #$MQTT_Data{$instance}{self} = $self; if (1 == scalar(keys %MQTT_Data)) { # Add hooks on first call only - &main::print_log ("*** NJC added MQTT poll ..."); - &::MainLoop_pre_add_hook(\&mqtt::check_for_data, 1); + &main::print_log ("*** mqtt added MQTT check_for_data ..."); + &::MainLoop_pre_add_hook(\&mqtt::check_for_data, 1); } else { - &main::print_log ("*** NJC already added MQTT poll ..." . scalar(keys %MQTT_Data) ); - &main::print_log ("*** NJC already added MQTT poll ... but that's okay" ); + &main::print_log ("*** mqtt already added MQTT poll ..." . scalar(keys %MQTT_Data) ); + &main::print_log ("*** mqtt already added MQTT poll ... but that's okay" ); #exit 1; } - ### 6a) publish a hello to our initial subscription - ### 6b) check for ping response (hmmm, need to thing about this) + + return $self; } # ------------------------------------------------------------------------------ # Handle device I/O: Read and write messages on the bus =item C -Called at the start of every loop. This checks either the serial or server port -for new data. If data is found, the data is broken down into individual -messages and sent to C to be parsed. The message is then -compared to the previous data received if this is a duplicate message it is -logged and ignored. If this is a new message it is sent to C. - -03/13/15 11:15:14 AM *** NJC read_mqtt_msg Receive buffer: - 30 1d 00 0c 68 6f 6d 65 2f 68 61 2f 74 65 73 74 0...home/ha/test - 64 65 70 72 65 63 61 74 65 64 20 61 6c 73 6f deprecated also -03/13/15 11:15:14 AM *** NJC rcv'd: Publish/at-most-once home/ha/test - 64 65 70 72 65 63 61 74 65 64 20 61 6c 73 6f deprecated also - -We see this every $keep_alive_timeout seconds -03/13/15 11:17:08 AM *** NJC read_mqtt_msg Receive buffer: - d0 00 .. -03/13/15 11:17:08 AM *** NJC Received: PingResp/at-most-once + Called at the start of every loop. This checks for new data. =cut my @outqueue = (); # Queue of messages to be sent -my $count = 0; # Number of passes since last message sent sub check_for_data { foreach my $inst (keys %MQTT_Data) { - my $socket = $MQTT_Data{$inst}{'socket'}; - my $self = $MQTT_Data{$inst}{self}; + my $self = $MQTT_Data{$inst}{self}; + ### MQTT stuff below # This one doesn't block - my $msg = read_mqtt_msg($socket, $buf); + my $msg = read_mqtt_msg($self, $buf); + + ### -[ Input ]---------------------------------------------------------- if ($msg) { ### @@ -303,40 +360,38 @@ sub check_for_data { ### the top is the address, the message the value ### if ($msg->message_type == MQTT_PUBLISH) { - &main::print_log ("*** NJC check_for_data R: Dumper: " . Dumper($msg)); ### ### Someone published something, deal with it ### - if ($verbose == 0) { - &main::print_log ("*** NJC check_for_data rcv'd: T:" . $msg->topic, ", M:", $msg->message); - } else { - &main::print_log ("*** NJC check_for_data rcv'd: T:" . $msg->topic, ", M:", $msg->message); - &main::print_log ("*** NJC check_for_data rcv'd: S:" . $msg->string . ","); + if ($main::Debug{mqtt}) { + &main::print_log ("*** mqtt check_for_data rcv'd: T:" . $msg->topic, ", M:", $msg->message); + &main::print_log ("*** mqtt check_for_data rcv'd: S:" . $msg->string . ","); } - #main::print_log("$port_name got: [$::Serial_Ports{$port_name}{data_record}]"); - #$MQTT_Data{$instance}{'obj'}->_parse_data_to_obj($msg); - #$self->parse_data_to_obj($msg); - ### ### So we want the topic and the message - ### topic = address or identifier of the device - ### message = is the value of the device + ### $msg->topic = address or identifier of the device + ### $msg->message = is the value of the device ### - ### For now let's not get too fancy - ### $self->{$msg->topic}->{message} = $msg->message - ### $self->{$msg->topic}->{timestamp} = timestamp + ### We also need the instance to know who set the obj ### + $self->parse_data_to_obj($msg, $inst); + } elsif ($msg->message_type == MQTT_PINGRESP) { - $got_ping_response = 1; - &main::print_log ('*** NJC check_for_data Ping rcvd: ' . $msg->string) if ($verbose >= 3); + $$self{got_ping_response} = 1; + &main::print_log ("*** mqtt $inst check_for_data Ping rcvd: " . $msg->string) if ($main::Debug{mqtt}); } else { - &main::print_log ("*** NJC check_for_data Received: $msg->string") if ($verbose >= 2); + # "$msg->string" + # Net::MQTT::Message::SubAck=HASH(0x2da94e0)->string + &main::print_log ("*** mqtt $inst check_for_data Received: " . $msg->string) if ($main::Debug{mqtt}); } } + ### -[ Output ]--------------------------------------------------------- +=begin comment # This is where we need to check for outgoing - # Check for output + # I think we're writing directly at the moment, So I probably won't need + # this. Not sure which is better if ($#outqueue >= 0) { my $mref = shift @outqueue; @@ -344,174 +399,388 @@ sub check_for_data { ### Okay this is the other hard part ### #send_mqtt_msg($mref)); - &main::print_log ("*** NJC check_for_data send_mqtt_msg $mref "); + &main::print_log ("*** mqtt $inst check_for_data send_mqtt_msg $mref ") if ($main::Debug{mqtt}); } +=cut + ### -[ Ping ]----------------------------------------------------------- - } # End of foreach $socket + ### + ### We need to deal with the mqtt ping for each socket, not 1 ping for all + ### + # Ping check + if (Time::HiRes::time > $$self{next_ping}) { + ### + ### We've exceeded the ping time + ### + &main::print_log ("*** mqtt $inst read_mqtt_msg Ping Response timeout.") unless ($$self{got_ping_response}); + ### + ### This has confused me, I'm not certain if I should put it back in or not + ### I'll need to sit down a put together a state table and review this + ### + # return unless ($$self{got_ping_response}); - ### - ### We need to deal with the pint for each socket, not 1 ping for all - ### - # Ping check - if (Time::HiRes::time > $next_ping) { - &main::print_log ("*** NJC read_mqtt_msg Ping Response timeout.") unless ($got_ping_response); - return unless ($got_ping_response); - send_mqtt_msg($socket, message_type => MQTT_PINGREQ); - } + &main::print_log ("*** mqtt $inst read_mqtt_msg Ping Request") if ($main::Debug{mqtt}); + send_mqtt_msg($self, message_type => MQTT_PINGREQ); + $$self{got_ping_response} = 0; + } + } # End of foreach $socket } # ------------------------------------------------------------------------------ -=item C<()> +=item C<(send_mqtt_msg)> =cut sub send_mqtt_msg { - my $socket = shift; + my $self = shift; my $msg = Net::MQTT::Message->new(@_); $msg = $msg->bytes; # syswrite ? - syswrite $socket, $msg, length $msg; - $next_ping = Time::HiRes::time + $keep_alive_timer; + syswrite $$self{socket}, $msg, length $msg; + # Reset the next_ping timer (we sent something so we don't need another ping + # until now+keep_alive_timer + $$self{next_ping} = Time::HiRes::time + $$self{keep_alive_timer}; } # ------------------------------------------------------------------------------ =item C =cut sub read_mqtt_msg { - my $socket = shift; - my $select = IO::Select->new($socket); - my $timeout = $next_ping - Time::HiRes::time; + my $self = shift; + + my $select = IO::Select->new($$self{socket}); + my $timeout = $$self{next_ping} - Time::HiRes::time; do { ### ### I really need to sit down and figure this out ### my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1); - - #&main::print_log ("*** NJC read_mqtt_msg n " . $_[0]); - - #print "0"; - #&main::print_log ("*** NJC read_mqtt_msg 0"); + # + # I am a little confused by this + # return $mqtt if (defined $mqtt); - #print "1"; - #&main::print_log ("*** NJC read_mqtt_msg 1"); ### very short wait ### Return if there is no data $select->can_read(0.1) || return; - #print "2"; - #&main::print_log ("*** NJC read_mqtt_msg 2"); - $timeout = $next_ping - Time::HiRes::time; - - #print "3"; - #&main::print_log ("*** NJC read_mqtt_msg 3"); - # Sysread ? sysread FILEHANDLE,SCALAR,LENGTH,OFFSET - my $bytes = sysread $socket, $_[0], 2048, length $_[0]; + # + $timeout = $$self{next_ping} - Time::HiRes::time; - #print "4"; - #&main::print_log ("*** NJC read_mqtt_msg 4"); + # can return undef (error) or 0 bytes (eof) + my $bytes = sysread $$self{socket}, $_[0], 2048, length $_[0]; + # We get no bytes if there is an error or the socket has closed unless ($bytes) { - &main::print_log ("*** NJC read_mqtt_msg Socket closed " . (defined $bytes ? 'gracefully' : 'error')); + &main::print_log ("*** mqtt $$self{instance}: read_mqtt_msg Socket closed " . (defined $bytes ? 'gracefully ' : "with error [ $! ]")); + # Not a permanent solution just a way to keep debugging + &main::print_log ("*** mqtt deleting $$self{instance}\n" . Dumper(\$self)) if ($main::Debug{mqtt}); + delete($MQTT_Data{$$self{instance}}); + return; } - - #&main::print_log ("*** NJC read_mqtt_msg Receive buffer: " . dump_string($_[0])) if ($verbose >= 3); } while ($timeout > 0); - - #&main::print_log ("*** NJC read_mqtt_msg 5"); - return; } # ------------------------------------------------------------------------------ =item C =cut sub read_mqtt_msg_timeout { - my $socket = shift; - &main::print_log ("*** NJC read_mqtt_msg_timeout Socket " . ($socket->connected ? "Connected" : "Failed")); - my $select = IO::Select->new($socket); - my $timeout = $next_ping - Time::HiRes::time; + my $self = shift; - do { - my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1); + my $select = IO::Select->new($$self{socket}); + my $timeout = $$self{next_ping} - Time::HiRes::time; - #&main::print_log ("*** NJC read_mqtt_msg n " . $_[0]); + do { + my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1); - #print "0"; - #&main::print_log ("*** NJC read_mqtt_msg 0"); - return $mqtt if (defined $mqtt); + return $mqtt if (defined $mqtt); - #print "1"; - &main::print_log ("*** NJC read_mqtt_msg 1 ($timeout)"); - ### This is where it waits (blocking) - ### - $select->can_read($timeout) || return; + ### + ### This is where it waits (blocking) + ### + $select->can_read($timeout) || return; - #print "2"; - #&main::print_log ("*** NJC read_mqtt_msg 2"); - $timeout = $next_ping - Time::HiRes::time; + # + $timeout = $$self{next_ping} - Time::HiRes::time; - #print "3"; - #&main::print_log ("*** NJC read_mqtt_msg 3"); - # Sysread ? sysread FILEHANDLE,SCALAR,LENGTH,OFFSET - my $bytes = sysread $socket, $_[0], 2048, length $_[0]; + # can return undef (error) or 0 bytes (eof) + my $bytes = sysread $$self{socket}, $_[0], 2048, length $_[0]; - &main::print_log ("*** NJC sysread Socket " . ($socket->connected ? "Connected" : "Failed")); + # We get no bytes if there is an error or the socket has closed + unless ($bytes) { + &main::print_log ("*** mqtt $$self{instance}: read_mqtt_msg Socket closed " . (defined $bytes ? 'gracefully ' : "with error [ $! ]")); + # Not a permanent solution just a way to keep debugging + &main::print_log ("*** mqtt deleting $$self{instance}\n" . Dumper(\$self)) if ($main::Debug{mqtt}); + delete($MQTT_Data{$$self{instance}}); - #print "4"; - #&main::print_log ("*** NJC read_mqtt_msg 4"); + return; + } + } while ($timeout > 0); +} +# ------------------------------------------------------------------------------ +=item C +=cut +sub set { + my ($self, $msg, $set_by) = @_; - unless ($bytes) { - &main::print_log ("*** NJC read_mqtt_msg Socket closed " . (defined $bytes ? 'gracefully' : 'error') . " ($timeout)($!)"); - return; - } + my $data; - &main::print_log ("*** NJC read_mqtt_msg Receive buffer: " . dump_string($_[0])) if ($verbose >= 3); - } while ($timeout > 0); + ### + ### Okay here is the hard part + ### I need the instance socket, the obj's topic and message + ### in order to send the message + ### + $$self{instance}->pub_msg(message_type => MQTT_PUBLISH, + retain => $$self{retain}, + topic => $$self{topic}, + message => $msg); +} +# ------------------------------------------------------------------------------ +=item C<(pub_msg())> +=cut +### +### We're writing direct but that's okay as it's not like we're waiting on a +### serial port where things can really get backed up (Hmm, are we blocking on +### a write?) +### +sub pub_msg { + my $self = shift; + + $self->send_mqtt_msg(@_); +} +# ------------------------------------------------------------------------------ +=item C<(add)> +=cut +sub add { + my ($self, @p_objects) = @_; + + my @l_objects; - #&main::print_log ("*** NJC read_mqtt_msg 5"); - return; + for my $l_object (@p_objects) { + if ($l_object->isa('Group_Item') ) { + @l_objects = $$l_object{members}; + for my $obj (@l_objects) { + $self->add($obj); + } + } else { + $self->add_item($l_object); + } + } } + # ------------------------------------------------------------------------------ -=item C +=item C<(add_item)> =cut -sub set { - my ($self, $state, $set_by) = @_; +sub add_item { + my ($self, $p_object) = @_; + + push @{$$self{objects}}, $p_object; + return $p_object; } + # ------------------------------------------------------------------------------ +=item C<(remove_all_items)> +=cut +sub remove_all_items { + my ($self) = @_; + + &main::print_log ("*** mqtt mqtt remove_all_items()"); + =begin comment -=item C + if (ref $$self{objects}) { + foreach (@{$$self{objects}}) { + # $_->untie_items($self); + } + } +=cut + delete $self->{objects}; +} -Used to send commands to the interface. +# ------------------------------------------------------------------------------ +=item C<(add_item_if_not_present)> =cut -sub new { - bless $self, $class; +sub add_item_if_not_present { + my ($self, $p_object) = @_; - return $self; + if (ref $$self{objects}) { + foreach (@{$$self{objects}}) { + if ($_ eq $p_object) { + return 0; + } + } + } + $self->add_item($p_object); + return 1; +} + +# ------------------------------------------------------------------------------ +=item C<(remove_item)> +=cut +sub remove_item { + my ($self, $p_object) = @_; + + if (ref $$self{objects}) { + for (my $i = 0; $i < scalar(@{$$self{objects}}); $i++) { + if ($$self{objects}->[$i] eq $p_object) { + splice @{$$self{objects}}, $i, 1; + # $p_object->untie_items($self); + return 1; + } + } + } + return 0; } # ------------------------------------------------------------------------------ =item C -Take the data and parse it to the MH obj() - -$msg = bless( { 'topic' => 'home/ha/test/x10/A1', - 'remaining' => '', - 'retain' => 1, - 'dup' => 0, - 'message_type' => 3, - 'qos' => 0, - 'message' => 'A1AON' - }, - 'Net::MQTT::Message::Publish' -); + Take the data and parse it to the MH obj() + + $msg = bless( { + 'topic' => 'home/ha/test/x10/1', + 'remaining' => '', + 'retain' => 1, + 'dup' => 0, + 'message_type' => 3, + 'qos' => 0, + 'message' => 'deprecated also' + }, + 'Net::MQTT::Message::Publish' + ); =cut sub parse_data_to_obj { - my ($self, $msg) = @_; + my ($self, $msg, $p_setby) = @_; + # for my $obj (@{$$self{objects}}) { - $obj->set($msg,$self); + if("$$obj{topic}" eq "$$msg{topic}") { + $obj->set($$msg{message}, $self,); + } else { + #&main::print_log ("***mqtt mqtt obj ($$obj{topic}) vs ($$msg{topic})"); + } } + +=begin comment +=cut } +# -[ Fini - mqtt ]-------------------------------------------------------------- + +package mqtt_Item; + +use strict; + +use Net::MQTT::Constants qw/:all/; + +use Data::Dumper; + +@mqtt_Item::ISA = ( 'Generic_Item', "mqtt" ); + +=item C + + Creates a MQTT Item/object. The following parameter are required: + + =over + + =item name: the 'friendly' name of the squeezebox in squeezecenter. This parameter is used to link this object to the correct status messages in the CLI interface of squeezecenter + + =item interface: the object that is the CLI interface to assign this player to. + + =back + + The following parameters are optional + + =over + + =item amplifier: the object that needs to be enabled and disabled together with the squeezebox + + =item auto_off_time: the time (in minutes) the squeezebox and the optional attached amplifier should be turned off after a playlist has ended + + =item preheat_time: the time (in seconds) the amplifier should be turned on before a notification is played if the amplifier is off. This enables the amplifier to turn on and enable the speakers before the notification is played. + + =back + + $msg = bless( { 'topic' => 'home/ha/test/x10/1', + 'remaining' => '', + 'retain' => 1, + 'dup' => 0, + 'message_type' => 3, + 'qos' => 0, + 'message' => 'deprecated also' +}, + 'Net::MQTT::Message::Publish' + ); +=cut + +sub new { + my ( $class, $instance, $topic, $qos, $retain ) = @_; + + my $self = new Generic_Item(); + + bless $self, $class; + + $self->interface($instance) if defined $instance; + + $$self{topic} = $topic; + $$self{message} = ''; + $$self{retain} = $retain || 0; + $$self{QOS} = $qos || 0; + + $$self{instance}->add($self); + + # We may need flags to deal with XML, JSON or Text + return $self; +} + +=item C +=cut +sub interface { + my ($self,$p_instance) = @_; + + $$self{instance} = $p_instance if defined $p_instance; + + return $$self{instance}; +} + +=item C +=cut +sub set { + my ($self, $msg, $p_setby, $p_response) = @_; + + # prevent reciprocal sets that can occur because of this method's state + # propogation + # FIXME: Use of uninitialized value in string eq at /home/njc/dev/mh/bin/../lib/mqtt.pm line 752 + #return if (ref $p_setby and $p_setby->can('get_set_by') and + # $p_setby->{set_by} eq $self); ### FIXME: Use of uninitialized value in string eq at /home/njc/dev/mh/bin/../lib/mqtt.pm 771 + + if (defined($p_setby) && $p_setby eq $self->interface()) { + ### + ### Incoming (MQTT to MH) + ### + &::print_log("*** mqtt mqtt_Item nom to MH " . $self->get_object_name() . "::set($msg, $p_setby)") if $main::Debug{mqtt}; + } else { + ### + ### Outgoing (MH to MQTT) + ### + #&::print_log("*** mqtt mqtt_Item nom to MQTT " . $self->get_object_name() . "::set($msg, $p_setby)") if $main::Debug{mqtt}; + &::print_log("*** mqtt mqtt_Item nom to MH " . $self->get_object_name() . ' no ::set($msg, $p_setby)') if $main::Debug{mqtt}; + + ### + ### I need the instance socket, the obj's topic and message + ### in order to send the message + ### + $$self{instance}->pub_msg(message_type => MQTT_PUBLISH, + retain => $$self{retain}, + topic => $$self{topic}, + message => $msg); + } + + $self->SUPER::set($msg, $p_setby,$p_response) if defined $msg; +} + +# -[ Fini - mqtt_Item ]--------------------------------------------------------- + # -[ Fini ]--------------------------------------------------------------------- +1; -return 1; +=begin comment +=cut From 0fc41119e0483afa22aeed62bde3f045c20d3758 Mon Sep 17 00:00:00 2001 From: Neil Cherry Date: Mon, 30 Mar 2015 15:26:39 -0400 Subject: [PATCH 3/4] Further work on the mqtt module I've made a number of changes to allow more than one mqtt instance. This is meant to provide support for multiple hosts and ports pairs. I noticed that using the same host:port would cause the second request to be disconnected. So I reused the first host:port instance (I really only wanted the socket). So I reused the instance and issued a new subscription. This has man side effects that need to be worked out. There are a number of other fix "me"s and todos (listed in the source). Reconnect handleing among the top issues. See Usage and Example in the source file for changes to instanctiation. --- data/organizer/calendar.tab | 20 -- lib/mqtt.pm | 473 +++++++++++++++++++++++++++++------- 2 files changed, 382 insertions(+), 111 deletions(-) delete mode 100644 data/organizer/calendar.tab diff --git a/data/organizer/calendar.tab b/data/organizer/calendar.tab deleted file mode 100644 index 6fc335c54..000000000 --- a/data/organizer/calendar.tab +++ /dev/null @@ -1,20 +0,0 @@ -ID DATE TIME EVENT CATEGORY DETAILS HOLIDAY VACATION SOURCE REMINDER ENDTIME -1 2001.10.2 10:20 Test Event Test This is a test event to make sure data entry is working. -2 2002.3.16 2:57 pm test -3 2002.3.19 12:52 pm test -4 2002.3.29 6:27 pm test -5 2002.3.29 7:26 pm test 2 -6 2002.4.20 7 pm Look at the stars -7 2002.5.6 10:20 PM test it's now "time" -8 2002.5.6 10:19 PM test hi there -9 2002.6.2 2 pm David's Open House -10 2002.6.2 3 pm David's Open House -11 2002.6.2 4 pm David's Open House -12 2002.6.6 8 am Pay Estimated Tax -13 2002.6.4 test -14 2002.6.8 Seep in -15 2002.6.6 Mow the lawn -16 2002.6.30 8:18 pm test again -17 2002.10.12 1:15 pm test1 -18 2002.10.12 8 pm test2 hi -19 2005.1.18 9 pm Chris radio show http://www.thespacereport.com/ diff --git a/lib/mqtt.pm b/lib/mqtt.pm index 6ec77e3b2..a9269e328 100644 --- a/lib/mqtt.pm +++ b/lib/mqtt.pm @@ -2,6 +2,21 @@ =begin comment @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ + =head1 B + + Neil Cherry + + =head2 SYNOPSIS + + A MQTT Interface and Item module for Misterhouse + + =head2 DESCRIPTION + + Misterhouse MQTT interface for use with any MQTT service + + MQTT website: http://mqtt.org/ + MQTT Test service: http//test.mosquitto.org/ (test.mosquitto.org port 1883) + File: mqtt.pm @@ -17,7 +32,7 @@ Description: Based loosely on the UPMPIM.pm and SqueezeCLI.pm code - Jason Sharpee (UPB) - - (SqueezeCLI) + - Lieven Hollevoet (SqueezeCLI) License: This free software is licensed under the terms of the GNU public license. @@ -27,17 +42,33 @@ Usage: .mht file: # MQTT stuff - CODE, require mqtt; #noloop - CODE, $mqtt_1 = new mqtt('mqtt_1', '127.0.0.1', 1883, 'home/ha/#', 121); #noloop - #CODE, $mqtt_2 = new mqtt('mqtt_2', 'test.mosquitto.org', 1883, 'home/test/#', 122); #noloop - #CODE, $mqtt_3 = new mqtt('mqtt_3', '127.0.0.1', 1883, 'home/network/#', 122); #noloop - CODE, $CR_Temp = new mqtt_Item($mqtt_1, "home/ha/text/x10/1"); #noloop - - CODE, $CR_Temp->set("Off"); + CODE, require mqtt; #noloop + # + CODE, $mqtt_1 = new mqtt('mqtt_1', '127.0.0.1', 1883, 'home/ha/#', "", "", 121); + CODE, $mqtt_2 = new mqtt('mqtt_2', 'test.mosquitto.org', 1883, 'home/test/#', "", "", 122); + CODE, $mqtt_3 = new mqtt('mqtt_3', '127.0.0.1', 1883, 'home/network/#', "", "", 122); #noloop + # + CODE, $CR_Temp = new mqtt_Item($mqtt_1, "home/ha/text/x10/1"); + CODE, $M2_Temp = new mqtt_Item($mqtt_2, "test.mosquitto.org/test/x10/1"); + CODE, $M3_Temp = new mqtt_Item($mqtt_3, "home/network/test/x10/1"); + # + CODE, $CR_Temp->set("On"); + CODE, $M2_Temp->set("Off"); + CODE, $M3_Temp->set("On"); + + and my mqtt.pl in my code dir: + + # + if ($New_Minute and !($Minute % 30)) { + my $state = ('on' eq state $M2_Temp) ? 'off' : 'on'; + set $M2_Temp $state; + my $remark = "M2 Light set to $state"; + print_log "$remark"; + } CLI generation of a command to the CR_Temp - mosquitto_pub -d -h 127.0.0.1 -q 0 -t home/ha/text/x10/1 -m "Off" + mosquitto_pub -d -h test.mosquitto.org -q 0 -t test.mosquitto.org/test/x10/1 -m "Off" Example initialization: @@ -49,32 +80,11 @@ Notes: Special Thanks to: Bruce Winter - MH Jason Sharpee - MH UPB pkg + Lieven Hollevoet - SqueezeCLI.pm @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ - =head1 B - - Neil Cherry - - =head2 SYNOPSIS - - NONE - - =head2 DESCRIPTION - - Misterhouse MQTT interface for use with any MQTT service - - MQTT website: http://mqtt.org/ - -Notes: - -Need: - MQTT Device - test.mosquitto.org - 1883 - or - mozart.uucp - 1883 + =head2 B The test.mosquitto.org server listens on ports 1883, 8883, 8884 and 8885. Port 1883 is the standard unencrypted MQTT port and can be used @@ -84,46 +94,47 @@ Need: Web sockets are not supported here in MH - MQTT Item Pub (WO) - Device - 'topic' - - MQTT Item Sub (RO) - Device - 'topic' - Topics (examples) - # - /ha/# - /ha/house/livingroom/lamp/ - /ha/weather/temp - /ha/weather/windspeed/ + # + ha/# + ha/house/livingroom/lamp + ha/weather/temp + ha/weather/windspeed For now we'll use the wildcard. I'll think about a rewrite later without the wild card support. I don't recommend using the '#' if the MQTT is test.mqtt.org Rather pick something a bit more unique like /username/ha/# and play from there Because of the wildcard it probably makes sense to support multiple mqtt - connections. This would allow for: + connections on 1 or more servers. This would allow for: - home/ha/x10/# - home/ha/z-wave/# - home/ha/zigbee/# + home/ha/x10/# + home/ha/z-wave/# + home/ha/zigbee/# instead of: - # + home/# + + or + + # Which could cover things like this: - home/email/... - home/statistics/... - offsite/... + home/email/... + home/statistics/... + offsite/... + + and just about everything else on this server too. :-) + + If you're using a home mqtt server then this might not be such as issue. + Give this command a try and see the amount of traffic generated: - If you're using a home mqtt then this might not be such as issue. + mosquitto_sub -d -h test.mosquitto.org -t "#" - The intial device needs some kind of way to tell that it's still connected to - the MQTT server (MQTT Ping comes to mind). + The intial device needs some kind of way to tell that it's still connected + to the MQTT server (MQTT Ping comes to mind). =head2 INHERITS @@ -135,6 +146,25 @@ Need: =item B + =item B + + There are a number of things that need to be done. There is a lack of error + checking and connectivity checks and restoration. I'm sure there are a huge + number of features that need to be added. + + @FIXME: Topic handling needs work, if the same host:port is used the first + instance (same socket) is used but this causes issues and at + reconnect we need to resubscribe. There is no way to do that now + (we'll need to resubscribe all the same socket related subscriptions) + @FIXME: user_name/password isn't working, I don't know why yet + @FIXME: We're really not checking for ConnAck or SubAck. + @FIXME: there is no reconnect logic + @FIXME: No SSL + @FIXME: Lots of error checking needs to be done + @FIXME: Use of uninitialized value + @TODO: Callbacks to handle decoding of the tiopic messages + @TODO: Analog device and string device handling + =cut # ------------------------------------------------------------------------------ use constant { TRUE => 1, FALSE => 0 }; @@ -161,23 +191,98 @@ use Data::Dumper; eval "use bytes"; # Not on all installs, so eval to avoid errors # Need to share this with the outside world -my $verbose = 5; my $buf = ''; my $msg_id = 1; my %MQTT_Data; +$main::Debug{mqtt} = 1; # ------------------------------------------------------------------------------ sub dump() { &main::print_log("*** mqtt Dumper (MQTT_Data):\n" . Dumper(\%MQTT_Data) . "***"); } +# ------------------------------------------------------------------------------ +=item + +Okay, I can see this is going to get complicated and require I do a rewrite of +the subscription handling. When we reconnect we want to also resubscribe. +Currently we can't do that. + +=cut +sub mqtt_reconnect() { + my ($self) = @_; + + ### + ### Do we need to do a clean up on the existing socket before we reconnect? + ### Will a close do that for us ? + ### + $$self{socket}->close(); + + &main::print_log("*** mqtt $$self{instance} mqtt_connect Socket ($$self{host}:$$self{port},$$self{keep_alive_timer}) ") if($main::Debug{mqtt}||1); + + ### 1) open a socket (host, port and keepalive + my $socket = IO::Socket::INET->new(PeerAddr => $self->{host} . ':' . $self->{port}, + Timeout => $self->{keep_alive_timer}, ); + + # Can't use this at this time + # $socket = new main::Socket_Item(undef, undef, "$host:$port", $instance); + + &main::print_log("*** mqtt $$self{instance} Socket check #1 ($$self{keep_alive_timer}) [ $! ]: " . ($self->isConnected() ? "Connected" : "Failed")) if($main::Debug{mqtt}); + return if(!defined($socket)); + + $self->{socket} = $socket; + $self->{got_ping_response} = 1; + $self->{next_ping} = $self->{keep_alive_timer}; + + # -------------------------------------------------------------------------- + ### 2) Send MQTT_CONNECT + $self->send_mqtt_msg(message_type => MQTT_CONNECT, keep_alive_timer => $self->{keep_alive_timer}); + + ### 3) Check for ACK or fail + &main::print_log("*** mqtt $$self{instance} Socket check #2 ($$self{keep_alive_timer}) [ $! ]: " . ($self->isConnected() ? "Connected" : "Failed")) if($main::Debug{mqtt}); + + my $msg = read_mqtt_msg_timeout($self, $buf); + if(!$msg) { + &main::print_log ("XXX mqtt $$self{instance} No ConnAck "); + #exit 1; + return; + } + + # We should actually get a SubAck but who is checking (yes, I know I should) + &main::print_log ("*** mqtt $$self{instance} Received: " . $msg->string) if ($main::Debug{mqtt}); + + ### ------------------------------------------------------------------------ + + ### + ### Here is where we need to make the changes to support multiple + ### subscriptions. + ### + + ### 4) Send a subscribe '#' (we'll have many of these, one for each device) + ### I don't know if this is a good idea or not but that's what I intend to do for now + $self->send_mqtt_msg(message_type => MQTT_SUBSCRIBE, + message_id => $msg_id++, + topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } $self->{topic} ]); + + ### 5) Check for ACK or fail + ### we really should check for a SubAck and that it's the correct SubAck + $msg = $self->read_mqtt_msg_timeout($buf) or &main::print_log ("*** mqtt $$self{instance} Received: ". "No SubAck"); + &main::print_log ("*** mqtt $$self{instance} Sub 1 Received: " . "$$msg{string}") if($main::Debug{mqtt}); + + ### ------------------------------------------------------------------------ + + ### 6) check for data + &main::print_log ("*** mqtt $$self{instance} Initializing MQTT re_connection ...") if($main::Debug{mqtt}); +} # ------------------------------------------------------------------------------ =item =cut sub mqtt_connect() { my ($self) = @_; + &main::print_log("*** mqtt mqtt_connect Socket ($$self{host}:$$self{port},$$self{keep_alive_timer}) ") if($main::Debug{mqtt}||1); + ### 1) open a socket (host, port and keepalive my $socket = IO::Socket::INET->new(PeerAddr => $self->{host} . ':' . $self->{port}, Timeout => $self->{keep_alive_timer}, ); @@ -200,13 +305,13 @@ sub mqtt_connect() { my $msg = read_mqtt_msg_timeout($self, $buf); if(!$msg) { - &main::print_log ("XXX mqtt No ConnAck "); - exit 1; + &main::print_log ("XXX mqtt $$self{instance} No ConnAck "); + #exit 1; return; } # We should actually get a SubAck but who is checking (yes, I know I should) - &main::print_log ("*** mqtt Received: " . $msg->string) if ($main::Debug{mqtt}); + &main::print_log ("*** mqtt $$self{instance} Received: " . $msg->string) if ($main::Debug{mqtt}); ### 4) Send a subscribe '#' (we'll have many of these, one for each device) ### I don't know if this is a good idea or not but that's what I intend to do for now @@ -215,11 +320,17 @@ sub mqtt_connect() { topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } $self->{topic} ]); ### 5) Check for ACK or fail - $msg = $self->read_mqtt_msg($buf) or &main::print_log ("*** mqtt Received: ". "No SubAck"); - &main::print_log ("*** mqtt $$self{instance} Sub 1 Received: " . "$$msg{string}") if($main::Debug{mqtt}); + $msg = $self->read_mqtt_msg_timeout($buf) or &main::print_log ("*** mqtt $$self{instance} Received: " . "No SubAck"); + if($main::Debug{mqtt}) { + my $s = defined($$msg{string}) ? "($$msg{string})" : '(--No $$msg{string}--)'; + ### + ### IF we're not getting $$msg{string} then what are we getting ? + ### + &main::print_log ("*** mqtt $$self{instance} Sub 1 Received: " . "$s"); # @FIXME: Use of uninitialized value + } ### 6) check for data - &main::print_log ("*** mqtt Initializing MQTT connection ...") if($main::Debug{mqtt}); + &main::print_log ("*** mqtt $$self{instance} Initializing MQTT connection ...") if($main::Debug{mqtt}); } # ------------------------------------------------------------------------------ =item C @@ -230,13 +341,21 @@ sub isConnected { return $$self{socket}->connected; } # ------------------------------------------------------------------------------ +=item C +=cut +sub isNotConnected { + my ($self) = @_; + + return !$$self{socket}->connected; +} +# ------------------------------------------------------------------------------ =item C Used to send commands to the interface. =cut sub new { - my ($class, $instance, $host, $port, $topic, $keep_alive_timer) = @_; + my ($class, $instance, $host, $port, $topic, $user, $password, $keep_alive_timer) = @_; my $self = {}; @@ -260,13 +379,19 @@ sub new { ### 5) Check for ACK or fail $buf = ''; - my $msg = read_mqtt_msg($self, $buf) or &main::print_log ("*** mqtt Received: ". "No SubAck"); + my $msg = read_mqtt_msg($self, $buf) or &main::print_log ("*** mqtt $$self{instance} Received: ". "No SubAck"); &main::print_log ("*** mqtt $inst Sub 2 Received: " . $msg->string) if($main::Debug{mqtt}); } # This is the little messages that appear when MH starts &main::print_log ("*** Reusing $inst (instead of $instance) on $host:$port $topic"); + ### + ### Ran into an issue doing it this way, it renames the object to the last + ### object name it encounters :( + ### I guess what I need to to create a new object but copy everything from the + ### previous one + ### return $MQTT_Data{$inst}{self}; } } @@ -275,22 +400,22 @@ sub new { # This is the little messages that appear when MH starts &main::print_log ("*** Creating $instance on $host:$port $topic");; - $$self{state} = ''; + $$self{state} = 'off'; $$self{said} = ''; - $$self{state_now} = ''; + $$self{state_now} = 'off'; @{$$self{command_stack}} = (); $$self{instance} = $instance; - $$self{host} = $host || "127.0.0.1"; + $$self{host} = "$host" || "127.0.0.1"; $$self{port} = $port || 1883; # Use the wildcard here, not in the mqtt_Item $$self{topic} = "$topic" || "home/ha/#"; # Currently not used - #$$self{user} = $user || "guest"; + $$self{user_name} = "$user" || ""; # Currently not used - #$$self{password} = $password || "guest"; + $$self{password} = "$password" || ""; $$self{keep_alive_timer} = $keep_alive_timer || 120; # @@ -299,6 +424,8 @@ sub new { bless $self, $class; + $self->set_states("off","on"); + $MQTT_Data{$instance}{self} = $self; if($main::Debug{mqtt}) { @@ -306,6 +433,8 @@ sub new { &main::print_log("*** Host = $$self{host}"); &main::print_log("*** Port = $$self{port}"); &main::print_log("*** Topic = $$self{topic}"); + &main::print_log("*** User = $$self{user_name}"); + &main::print_log("*** Password = $$self{password}"); &main::print_log("*** Keep Alive = $$self{keep_alive_timer}"); } ### ------------------------------------------------------------------------ @@ -321,11 +450,12 @@ sub new { &main::print_log ("*** mqtt added MQTT check_for_data ..."); &::MainLoop_pre_add_hook(\&mqtt::check_for_data, 1); } else { - &main::print_log ("*** mqtt already added MQTT poll ..." . scalar(keys %MQTT_Data) ); - &main::print_log ("*** mqtt already added MQTT poll ... but that's okay" ); + #&main::print_log ("*** mqtt already added MQTT poll ..." . scalar(keys %MQTT_Data) ); + #&main::print_log ("*** mqtt already added MQTT poll ... but that's okay" ); #exit 1; } + $self->set('on', $self); return $self; } # ------------------------------------------------------------------------------ @@ -343,6 +473,30 @@ sub check_for_data { ### MQTT stuff below + # Check for connectivity + if($self->isNotConnected()) { + ### + ### This needs a lot of work + ### + ### @FIXME: failed connection + if('off' ne $self->{state}) { + # First say something + &main::print_log ("*** mqtt $inst failed ($$self{host}/$$self{port}/$$self{topic})"); + # Then do something (reconnect) + + # check the state to see if it's off already +=begin comment +03/29/15 11:17:47 AM *** mqtt mqtt_4 failed (m11.cloudmqtt.com/15050/home/network/#) +03/29/15 11:17:47 AM *** mqtt mqtt set mqtt_4: [(off), undefined set_by, $mqtt_4] +03/29/15 11:17:47 AM *** mqtt mqtt set mqtt_4: isa mqtt +=cut + $self->set('off', $self); + } + + # Skip if we're not connected + next ; + } + # This one doesn't block my $msg = read_mqtt_msg($self, $buf); @@ -364,8 +518,8 @@ sub check_for_data { ### Someone published something, deal with it ### if ($main::Debug{mqtt}) { - &main::print_log ("*** mqtt check_for_data rcv'd: T:" . $msg->topic, ", M:", $msg->message); - &main::print_log ("*** mqtt check_for_data rcv'd: S:" . $msg->string . ","); + &main::print_log ("*** mqtt $inst check_for_data rcv'd: T:" . $msg->topic, ", M:", $msg->message); + &main::print_log ("*** mqtt $inst check_for_data rcv'd: S:" . $msg->string . ","); } ### @@ -375,7 +529,7 @@ sub check_for_data { ### ### We also need the instance to know who set the obj ### - $self->parse_data_to_obj($msg, $inst); + $self->parse_data_to_obj($msg, $inst, $self); } elsif ($msg->message_type == MQTT_PINGRESP) { $$self{got_ping_response} = 1; @@ -446,7 +600,7 @@ sub send_mqtt_msg { sub read_mqtt_msg { my $self = shift; - my $select = IO::Select->new($$self{socket}); + my $select = IO::Select->new($$self{socket}); my $timeout = $$self{next_ping} - Time::HiRes::time; do { @@ -523,17 +677,35 @@ sub read_mqtt_msg_timeout { sub set { my ($self, $msg, $set_by) = @_; - my $data; + if($main::Debug{mqtt} || 1) { + my $xStr = defined($msg) ? "($msg)" : "undefined message"; + $xStr .= defined($set_by) ? ", ($set_by)" : ", undefined set_by"; + $xStr .= ", Obj: " . defined($$self{object_name}) ? ", $$self{object_name}" : ", undefined object_name"; # @FIXME: Use of uninitialized value - ### - ### Okay here is the hard part - ### I need the instance socket, the obj's topic and message - ### in order to send the message - ### - $$self{instance}->pub_msg(message_type => MQTT_PUBLISH, - retain => $$self{retain}, - topic => $$self{topic}, - message => $msg); + &main::print_log ("*** mqtt mqtt set $$self{instance}: [$xStr]"); + &main::print_log ($self->isa('mqtt') ? "*** mqtt mqtt set $$self{instance}: isa mqtt" : "*** mqtt mqtt set $$self{instance}: is nota mqtt"); + } + + return unless($msg); + + + if($self->isa('mqtt')) { + # I really want to use this to allow the user to manually + # connect and disconnect the socket + + # + $self->SUPER::set($msg, $set_by) if defined $msg; + } else { + ### + ### Okay here is the hard part + ### I need the instance socket, the obj's topic and message + ### in order to send the message + ### + $$self{instance}->pub_msg(message_type => MQTT_PUBLISH, + retain => $$self{retain}, + topic => $$self{topic}, + message => $msg); + } } # ------------------------------------------------------------------------------ =item C<(pub_msg())> @@ -546,6 +718,16 @@ sub set { sub pub_msg { my $self = shift; + # Check for connectivity + if($self->isNotConnected()) { + # First say something + &main::print_log ("*** mqtt $$self{instance} failed ($$self{host}/$$self{port}/$$self{topic})"); + # Then do something (reconnect) + + # Skip if we're not connected + return ; + } + $self->send_mqtt_msg(@_); } # ------------------------------------------------------------------------------ @@ -750,20 +932,24 @@ sub set { # propogation # FIXME: Use of uninitialized value in string eq at /home/njc/dev/mh/bin/../lib/mqtt.pm line 752 #return if (ref $p_setby and $p_setby->can('get_set_by') and - # $p_setby->{set_by} eq $self); ### FIXME: Use of uninitialized value in string eq at /home/njc/dev/mh/bin/../lib/mqtt.pm 771 + # $p_setby->{set_by} eq $self); if (defined($p_setby) && $p_setby eq $self->interface()) { ### ### Incoming (MQTT to MH) ### - &::print_log("*** mqtt mqtt_Item nom to MH " . $self->get_object_name() . "::set($msg, $p_setby)") if $main::Debug{mqtt}; + &::print_log("*** mqtt mqtt_Item nom to MQTT to MH " . $self->get_object_name() . "::set($msg, $p_setby)") if $main::Debug{mqtt}; } else { ### ### Outgoing (MH to MQTT) ### - #&::print_log("*** mqtt mqtt_Item nom to MQTT " . $self->get_object_name() . "::set($msg, $p_setby)") if $main::Debug{mqtt}; - &::print_log("*** mqtt mqtt_Item nom to MH " . $self->get_object_name() . ' no ::set($msg, $p_setby)') if $main::Debug{mqtt}; - + if($main::Debug{mqtt}) { + if(defined($self->get_object_name())) { + &::print_log("*** mqtt mqtt_Item nom to MH to MQTT (" . $self->get_object_name() . ') no p_setby ::set($msg, $p_setby)'); + } else { + &::print_log('*** mqtt mqtt_Item nom to MH to MQTT () no p_setby ::set($msg, $p_setby)'); + } + } ### ### I need the instance socket, the obj's topic and message ### in order to send the message @@ -783,4 +969,109 @@ sub set { 1; =begin comment +The set_by has me currently puzzled, I'm not sure of it's purpose + +The state, state_now and said aren't exactly helping much either ;-) + +03/24/15 10:19:49 AM *** mqtt mqtt set mqtt_1: [(on), (web [192.168.24.232])] +03/24/15 10:19:49 AM *** mqtt mqtt set mqtt_1: +$VAR1 = \bless( { + 'states' => [ + 'off', + 'on' + ], + 'state_now' => '', + 'state' => '', + 'objects' => [ + bless( { + 'legacy_target' => undef, + 'tied_objects' => {}, + 'states' => [ + 'off', + 'on' + ], + 'QOS' => 0, + 'state_changed' => undef, + 'set_by' => undef, + 'state_prev' => 'x1xon', + 'set_time' => '1427120068', + 'state_now' => undef, + 'state' => 'x1xon', + 'target' => '', + 'setby_next_pass' => [], + 'state_next_pass' => [], + 'state_log' => [ + '03/20/15 02:51:04 PM x1xon set_by=$mqtt_3', + '03/20/15 01:15:50 PM x1xon set_by=$mqtt_1', + '03/20/15 01:15:00 PM x1xon set_by=$mqtt_1', + '03/20/15 01:01:35 PM on set_by=$mqtt_1', + ], + 'category' => 'Other', + 'topic' => 'home/ha/text/x10/1', + 'said' => undef, + 'retain' => 0, + 'tied_events' => {}, + 'change_pass' => 1, + 'target_next_pass' => [], + 'instance' => ${$VAR1}, + 'message' => '', + 'filename' => 'CapeCod_table', + 'object_name' => '$CR_Temp' + }, 'mqtt_Item' ), + bless( { + 'legacy_target' => undef, + 'tied_objects' => {}, + 'states' => [ + 'off', + 'on' + ], + 'QOS' => 0, + 'state_changed' => undef, + 'set_by' => ${$VAR1}, + 'state_prev' => 'x1xon', + 'set_time' => 1427206784, + 'state_now' => undef, + 'state' => 'x1xon', + 'target' => '', + 'setby_next_pass' => [], + 'state_next_pass' => [], + 'state_log' => [ + '03/24/15 10:19:44 AM x1xon set_by=$mqtt_3', + '03/24/15 10:18:08 AM x1xon set_by=$mqtt_3', + '03/24/15 10:15:54 AM x1xon set_by=$mqtt_3', + '03/24/15 10:12:59 AM x1xon set_by=$mqtt_3', + '03/24/15 10:10:10 AM x1xon set_by=$mqtt_3', + '03/24/15 08:22:51 AM x1xon set_by=$mqtt_3', + '03/23/15 11:08:48 AM x1xon set_by=$mqtt_3', + '03/20/15 02:51:02 PM x1xon set_by=' + ], + 'category' => 'Other', + 'topic' => 'home/network/test/x10/1', + 'said' => undef, + 'retain' => 0, + 'tied_events' => {}, + 'change_pass' => 9, + 'target_next_pass' => [], + 'instance' => ${$VAR1}, + 'message' => '', + 'filename' => 'CapeCod_table', + 'object_name' => '$M3_Temp' + }, 'mqtt_Item' ) + ], + 'command_stack' => [], + 'category' => 'Other', + 'topic' => 'home/ha/#', + 'got_ping_response' => 1, + 'said' => '', + 'instance' => 'mqtt_1', + 'port' => 1883, + 'keep_alive_timer' => 121, + 'host' => '127.0.0.1', + 'socket' => bless( \*Symbol::GEN6, 'IO::Socket::INET' ), + 'filename' => 'CapeCod_table', + 'next_ping' => '1427206904.98476', + 'object_name' => '$mqtt_3' + }, 'mqtt' ); + +-------------------------------------------------------------------------------- =cut From 56c4d944f1ff73a7ab25d5f31dcb4251187832db Mon Sep 17 00:00:00 2001 From: Lieven Hollevoet Date: Sun, 26 Jul 2015 22:28:56 +0200 Subject: [PATCH 4/4] Added authentication support --- lib/mqtt.pm | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/mqtt.pm b/lib/mqtt.pm index a9269e328..1090cc5f9 100644 --- a/lib/mqtt.pm +++ b/lib/mqtt.pm @@ -156,7 +156,6 @@ Notes: instance (same socket) is used but this causes issues and at reconnect we need to resubscribe. There is no way to do that now (we'll need to resubscribe all the same socket related subscriptions) - @FIXME: user_name/password isn't working, I don't know why yet @FIXME: We're really not checking for ConnAck or SubAck. @FIXME: there is no reconnect logic @FIXME: No SSL @@ -237,7 +236,7 @@ sub mqtt_reconnect() { # -------------------------------------------------------------------------- ### 2) Send MQTT_CONNECT - $self->send_mqtt_msg(message_type => MQTT_CONNECT, keep_alive_timer => $self->{keep_alive_timer}); + $self->send_mqtt_msg(message_type => MQTT_CONNECT, keep_alive_timer => $self->{keep_alive_timer}, user_name => $self->{user_name}, password => $self->{password}); ### 3) Check for ACK or fail &main::print_log("*** mqtt $$self{instance} Socket check #2 ($$self{keep_alive_timer}) [ $! ]: " . ($self->isConnected() ? "Connected" : "Failed")) if($main::Debug{mqtt}); @@ -298,7 +297,7 @@ sub mqtt_connect() { # -------------------------------------------------------------------------- ### 2) Send MQTT_CONNECT - $self->send_mqtt_msg(message_type => MQTT_CONNECT, keep_alive_timer => $self->{keep_alive_timer}); + $self->send_mqtt_msg(message_type => MQTT_CONNECT, keep_alive_timer => $self->{keep_alive_timer}, , user_name => $self->{user_name}, password => $self->{password}); ### 3) Check for ACK or fail &main::print_log("*** mqtt Socket check ($$self{keep_alive_timer}) [ $! ]: " . ($self->isConnected() ? "Connected" : "Failed")) if($main::Debug{mqtt});