isc.sensor.outputadaptor.oaextensions
Class SQLSink

java.lang.Object
  extended by org.openadaptor.adaptor.SimpleComponent
      extended by org.openadaptor.adaptor.AbstractWriter
          extended by org.openadaptor.adaptor.AbstractSimpleSink
              extended by isc.sensor.outputadaptor.oaextensions.SQLSink
All Implemented Interfaces:
org.openadaptor.adaptor.CallbackProvider, org.openadaptor.adaptor.Component, org.openadaptor.adaptor.Configurable, org.openadaptor.adaptor.IbafTransactionalResource, org.openadaptor.adaptor.JdbcTransactionalResource, org.openadaptor.adaptor.Sink, org.openadaptor.doconverter.DOMessageReaderDelegate

public class SQLSink
extends org.openadaptor.adaptor.AbstractSimpleSink
implements org.openadaptor.adaptor.JdbcTransactionalResource

This SQLSink works with SimpleSinkSQLGenerator to provide support for non-stored procedure databases like MySQL. Unfortunilty Open Adaptor 1.7.1 has a brain dead support for MySql via SQLSink and SimpleSinkSQLGenerator. This file has been copied and modified from the original 1.7.1 code.

Functionality Added:

  1. Support for multiple SQL Statements at the main sink level.
  2. Support for multiple Object Types
  3. Support for multiple SQL Statements per Object Type
  4. Support for inline object attribute transformers

SQL Statements at main level:
This is generally used when only one object type if defined. SQL statements are parameterised by the DataObject attributes names (delimited by %).

        A.C2.SQLStatement1 = insert orders select MAX(order_id)+1, '%ric_code%', %size%, %price%, getdate(), NULL from orders
                A.C2.SQLStatement2 = ....
                A.C2.SQLStatementn = ....
 

SQL Statements with Multip Object Types:
Each object defined in for this sink can contain one or many SQL statements for that object. This allows each object to be sent to multiple tables/database or combined with other data or split apart. SQL statements are parameterised by the DataObject attributes names (delimited by %).

Each object type should be defined as typename by a numerically increasing type number. For example: A.C2.Type1 = typename. The typename prefix should then be used to categorized the Sink type SQL statements. For example A.C2.typename.SQLStatement. if one SQL statements is defined use the single "SQLStatement" parameter. If more than one SQL statements are defined then use the numerical for "SQLStatementN" where N equals sql statement number (1, 2, 3, etc).

   
                A.C2.Type1 = UDPSession
                A.C2.UDPSession.SQLStatement = Insert Into sessions (startTime, sensorName, interfaceF1, interfaceF2, sessionKey, duration,
                protocolName, clientAddr, clientPort, serverAddr, serverPort, status, serviceName, packetsSent, packetsRecv, dataSent, dataRecv, retryPktSent,
                retryPktRecv) Values ('%startTime%', '%sensorName%', '%interface_f1%', '%interface_f2%', '%session%', %duration%, '%protocol%', '%client_addr%',
                %client_port%, '%server_addr%', %server_port%, '%status%', '%service_name%', %packets_sent%, %packets_recv%, %data_sent%, %data_recv%,
                %retry_packets_sent%, %retry_packets_recv%);
 
Inline Object Attribute Transformer Supports calling a transformer on an attribute prior to executing SQL statements. Since SQLSinks work on simple string attributes, it is sometimes necessary to transform attributed from a complex datatype to a string, for example a primitive array byte[], long[], etc. In some cases this mechaism is easier than using a transformer pipe, but functions the same way.


Specify the "NumAttributes = N" parameter where N is the total number of attributes in the type. Then specify the "AttNameN" and "AttTransformerN for the attribute to be transformed. AttNameN is the name of the attribute where N is the location of the attribute in the object. AttTransformerN is the fully qualified transformer object to be called and N is the location of the attributed in the object.

 
                A.C2.Type3 = TCPFlowSummary
                A.C2.TCPFlowSummary.NumAttributes=18
                A.C2.TCPFlowSummary.AttName17 = packetTimings
                A.C2.TCPFlowSummary.AttTransformer17 = isc.sensor.outputadaptor.oaextensions.PrimitiveArrayTransformerToString
                A.C2.TCPFlowSummary.AttName18 = tcpFlags
                A.C2.TCPFlowSummary.AttTransformer18 = isc.sensor.outputadaptor.oaextensions.PrimitiveArrayTransformerToString
 

See Also:
org.openadaptor.dostrings.transformer.AbstractTransformer, Openadaptor documentation

Nested Class Summary
 
Nested classes/interfaces inherited from class org.openadaptor.adaptor.SimpleComponent
org.openadaptor.adaptor.SimpleComponent.StateHolder
 
Field Summary
protected  java.sql.Connection _dbConnection
          The JDBC connection we establish.
protected  org.openadaptor.adaptor.util.JdbcConnectionParams _jdbcConnectionParams
          Jdbc connection container
(package private)  org.openadaptor.adaptor.jdbc.SinkSQLGenerator _sql_gen
          SQL Generator
(package private) static org.apache.log4j.Logger log
           
 
Fields inherited from class org.openadaptor.adaptor.AbstractSimpleSink
_continue_on_exception
 
Fields inherited from class org.openadaptor.adaptor.AbstractWriter
_packetName, _packetSize, _record_delimiter, _string_writer, _writeBatchAsRecord, _writer, PACKET_NAME, PACKET_SIZE
 
Fields inherited from class org.openadaptor.adaptor.SimpleComponent
_controller, _formatter, _name, _propsPrefix, _readerDelegate, _textEncoding, MESSAGE_WRITER_PREFIX, MESSAGEID_ATTR, TEXT_ENCODING_ATTR
 
Constructor Summary
SQLSink()
           
 
Method Summary
protected  void connectToServer()
           
 java.sql.Connection getJdbcConnection()
          get jdbc connection
 void init(java.lang.String name, java.util.Properties props, java.lang.String prefix, org.openadaptor.adaptor.Controller controller)
          Initialise from properties object.
 boolean isManagingOwnTransactions()
          Flag stating whether this Sink is managing its own transactions or not.
 void processMessage(org.openadaptor.adaptor.Message message)
          checks continue on exception flag, this will either throw a pipeline exception or mark data array element with exception and set mapping /** Processes the messages passed to it by the controller.
 void setJdbcConnection(java.sql.Connection c)
          set this components jdbc connection, throws exception if this is not possible
 void setManagingOwnTransactions(boolean managingOwnTransactions)
          instruct component to manage own connections throws exception if this is not possible
protected  boolean showSQLWarnings(java.sql.SQLWarning warn)
           
 void txnBegin()
          Called by the controller when a txnBegin is requested by the Source of a message
 void txnCommit()
          Called by the controller when a txnCommit is requested by the Source of a message
 void txnCommitWithExceptions(org.openadaptor.adaptor.Message message)
          Called by the controller when a txnCommit is requested by the Source of a message In this case not all of the data in the message was successfully processed, you can get an array of Exceptions using Message.getExceptions, this corresponds to the array of data objects.
 void txnRollback()
          Called by the controller when a txnRollback is requested by the Source of a message
 
Methods inherited from class org.openadaptor.adaptor.AbstractSimpleSink
cleanUp, processHospitalException, writerHospitalException
 
Methods inherited from class org.openadaptor.adaptor.AbstractWriter
getDOStringWriter, getRecordDelimiter, getWriter, setDOStringWriter, setRecordDelimiter, setWriter, writeDataObjects, writeMessage, writerCleanUp, writeRecord, writerStartUp
 
Methods inherited from class org.openadaptor.adaptor.SimpleComponent
addCallback, canUpdateWhileRunning, customControl, didReceiveMessage, getAsProperties, getCallbackManager, getCurrentState, getCustomControlProperties, getLastUID, getName, getProperty, getProperty, getPropsPrefix, getRequestedState, getSecurityManager, getStatus, getTextEncoding, getXMLFormatter, notifyEvent, pause, removeCallback, resume, setCallbackManager, setCallbackManager, setCurrentState, setFromProperties, setFromProperties, setFromResource, setName, setPropsPrefix, setRequestedState, terminate, txnRollback, waitForRequestedStateChange
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface org.openadaptor.adaptor.IbafTransactionalResource
txnRollback
 
Methods inherited from interface org.openadaptor.adaptor.Component
customControl, getName, getPropsPrefix, getRequestedState, getStatus, pause, resume, setName, setPropsPrefix, terminate
 
Methods inherited from interface org.openadaptor.adaptor.Configurable
canUpdateWhileRunning, getAsProperties, setFromProperties, setFromProperties
 
Methods inherited from interface org.openadaptor.adaptor.CallbackProvider
addCallback, getCallbackManager, notifyEvent, removeCallback, setCallbackManager, setCallbackManager
 

Field Detail

log

static org.apache.log4j.Logger log

_sql_gen

org.openadaptor.adaptor.jdbc.SinkSQLGenerator _sql_gen
SQL Generator


_jdbcConnectionParams

protected org.openadaptor.adaptor.util.JdbcConnectionParams _jdbcConnectionParams
Jdbc connection container


_dbConnection

protected java.sql.Connection _dbConnection
The JDBC connection we establish.

Constructor Detail

SQLSink

public SQLSink()
Method Detail

init

public void init(java.lang.String name,
                 java.util.Properties props,
                 java.lang.String prefix,
                 org.openadaptor.adaptor.Controller controller)
          throws org.openadaptor.adaptor.IbafException
Initialise from properties object. See class comment for property values

Specified by:
init in interface org.openadaptor.adaptor.Component
Overrides:
init in class org.openadaptor.adaptor.AbstractSimpleSink
Parameters:
props - Properties object
prefix - Prefix string to search for properties
name - The name to be given to the component
controller - The component's controller object
Throws:
DOStringException - Thrown if initialisation fails, mandatory properties are missing or properties cannot be parsed.
org.openadaptor.adaptor.IbafException - Thrown if initialisation fails, mandatory properties are missing or properties cannot be parsed.
See Also:
Controller, SimpleController

processMessage

public void processMessage(org.openadaptor.adaptor.Message message)
                    throws org.openadaptor.adaptor.PipelineException
Description copied from class: org.openadaptor.adaptor.AbstractSimpleSink
checks continue on exception flag, this will either throw a pipeline exception or mark data array element with exception and set mapping /** Processes the messages passed to it by the controller. Typically a Sink will take some decisive, final action with the message it receives. For example

Specified by:
processMessage in interface org.openadaptor.adaptor.Sink
Specified by:
processMessage in class org.openadaptor.adaptor.AbstractSimpleSink
Throws:
org.openadaptor.adaptor.PipelineException - If processing fails for any reason.

txnBegin

public void txnBegin()
              throws org.openadaptor.adaptor.IbafException
Description copied from class: org.openadaptor.adaptor.SimpleComponent
Called by the controller when a txnBegin is requested by the Source of a message

Specified by:
txnBegin in interface org.openadaptor.adaptor.IbafTransactionalResource
Overrides:
txnBegin in class org.openadaptor.adaptor.SimpleComponent
Throws:
org.openadaptor.adaptor.IbafException - If fails to begin the transaction
See Also:
Controller.txnBegin(Source)

txnCommitWithExceptions

public void txnCommitWithExceptions(org.openadaptor.adaptor.Message message)
                             throws org.openadaptor.adaptor.IbafException
Description copied from class: org.openadaptor.adaptor.SimpleComponent
Called by the controller when a txnCommit is requested by the Source of a message In this case not all of the data in the message was successfully processed, you can get an array of Exceptions using Message.getExceptions, this corresponds to the array of data objects. This default implementation must be overridden to support this behaviour

Specified by:
txnCommitWithExceptions in interface org.openadaptor.adaptor.IbafTransactionalResource
Overrides:
txnCommitWithExceptions in class org.openadaptor.adaptor.SimpleComponent
Throws:
org.openadaptor.adaptor.IbafException - If fails to commit the transaction
See Also:
Controller.txnCommit(Source)

txnCommit

public void txnCommit()
               throws org.openadaptor.adaptor.IbafException
Description copied from class: org.openadaptor.adaptor.SimpleComponent
Called by the controller when a txnCommit is requested by the Source of a message

Specified by:
txnCommit in interface org.openadaptor.adaptor.IbafTransactionalResource
Overrides:
txnCommit in class org.openadaptor.adaptor.SimpleComponent
Throws:
org.openadaptor.adaptor.IbafException - If fails to commit the transaction
See Also:
Controller.txnCommit(Source)

txnRollback

public void txnRollback()
                 throws org.openadaptor.adaptor.IbafException
Description copied from class: org.openadaptor.adaptor.SimpleComponent
Called by the controller when a txnRollback is requested by the Source of a message

Specified by:
txnRollback in interface org.openadaptor.adaptor.IbafTransactionalResource
Overrides:
txnRollback in class org.openadaptor.adaptor.SimpleComponent
Throws:
org.openadaptor.adaptor.IbafException - If fails to rollback the transaction
See Also:
Controller.txnRollback(Source)

isManagingOwnTransactions

public boolean isManagingOwnTransactions()
Flag stating whether this Sink is managing its own transactions or not. This is set to false by the SimpleController is involved in the transaction AND JdbcTransactionalResource is enabled (at the adaptor configuration level)

Specified by:
isManagingOwnTransactions in interface org.openadaptor.adaptor.JdbcTransactionalResource

setManagingOwnTransactions

public void setManagingOwnTransactions(boolean managingOwnTransactions)
                                throws org.openadaptor.adaptor.IbafException
Description copied from interface: org.openadaptor.adaptor.JdbcTransactionalResource
instruct component to manage own connections throws exception if this is not possible

Specified by:
setManagingOwnTransactions in interface org.openadaptor.adaptor.JdbcTransactionalResource
Throws:
org.openadaptor.adaptor.IbafException
See Also:
isManagingOwnTransactions()

getJdbcConnection

public java.sql.Connection getJdbcConnection()
Description copied from interface: org.openadaptor.adaptor.JdbcTransactionalResource
get jdbc connection

Specified by:
getJdbcConnection in interface org.openadaptor.adaptor.JdbcTransactionalResource
See Also:
_dbConnection

setJdbcConnection

public void setJdbcConnection(java.sql.Connection c)
                       throws org.openadaptor.adaptor.IbafException
Description copied from interface: org.openadaptor.adaptor.JdbcTransactionalResource
set this components jdbc connection, throws exception if this is not possible

Specified by:
setJdbcConnection in interface org.openadaptor.adaptor.JdbcTransactionalResource
Throws:
org.openadaptor.adaptor.IbafException
See Also:
_dbConnection

connectToServer

protected void connectToServer()
                        throws org.openadaptor.adaptor.IbafException
Throws:
org.openadaptor.adaptor.IbafException

showSQLWarnings

protected boolean showSQLWarnings(java.sql.SQLWarning warn)