api: use updated mysql driver on github (git://github.com/dizzyd/erlang-mysql-driver.git). New features include prepared statements, transactions, binary queries, type-converted query results, more efficient logging and a new connection pooling mechanism.
This commit is contained in:
parent
3de06ee609
commit
cf79030ec1
File diff suppressed because it is too large
Load Diff
|
@ -172,23 +172,21 @@ bxor_binary(B1, B2) ->
|
||||||
E1 bxor E2
|
E1 bxor E2
|
||||||
end, binary_to_list(B1), binary_to_list(B2))).
|
end, binary_to_list(B1), binary_to_list(B2))).
|
||||||
|
|
||||||
|
password_new([], _Salt) ->
|
||||||
|
<<>>;
|
||||||
password_new(Password, Salt) ->
|
password_new(Password, Salt) ->
|
||||||
%% Check for blank password
|
Stage1 = crypto:sha(Password),
|
||||||
case length(Password) of
|
Stage2 = crypto:sha(Stage1),
|
||||||
0 ->
|
Res = crypto:sha_final(
|
||||||
<<>>;
|
crypto:sha_update(
|
||||||
_ ->
|
crypto:sha_update(crypto:sha_init(), Salt),
|
||||||
Stage1 = crypto:sha(Password),
|
Stage2)
|
||||||
Stage2 = crypto:sha(Stage1),
|
),
|
||||||
Res = crypto:sha_final(
|
bxor_binary(Res, Stage1).
|
||||||
crypto:sha_update(
|
|
||||||
crypto:sha_update(crypto:sha_init(), Salt),
|
|
||||||
Stage2)
|
|
||||||
),
|
|
||||||
bxor_binary(Res, Stage1)
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_send(Sock, Packet, Num, LogFun) ->
|
do_send(Sock, Packet, Num, LogFun) ->
|
||||||
mysql:log(LogFun, debug, "mysql_auth send packet ~p: ~p", [Num, Packet]),
|
LogFun(?MODULE, ?LINE, debug,
|
||||||
|
fun() -> {"mysql_auth send packet ~p: ~p", [Num, Packet]} end),
|
||||||
Data = <<(size(Packet)):24/little, Num:8, Packet/binary>>,
|
Data = <<(size(Packet)):24/little, Num:8, Packet/binary>>,
|
||||||
gen_tcp:send(Sock, Data).
|
gen_tcp:send(Sock, Data).
|
||||||
|
|
|
@ -9,6 +9,13 @@
|
||||||
%%% Note : All MySQL code was written by Magnus Ahltorp, originally
|
%%% Note : All MySQL code was written by Magnus Ahltorp, originally
|
||||||
%%% in the file mysql.erl - I just moved it here.
|
%%% in the file mysql.erl - I just moved it here.
|
||||||
%%%
|
%%%
|
||||||
|
%%% Modified: 12 Sep 2006 by Yariv Sadan <yarivvv@gmail.com>
|
||||||
|
%%% Added automatic type conversion between MySQL types and Erlang types
|
||||||
|
%%% and different logging style.
|
||||||
|
%%%
|
||||||
|
%%% Modified: 23 Sep 2006 by Yariv Sadan <yarivvv@gmail.com>
|
||||||
|
%%% Added transaction handling and prepared statement execution.
|
||||||
|
%%%
|
||||||
%%% Copyright (c) 2001-2004 Kungliga Tekniska Högskolan
|
%%% Copyright (c) 2001-2004 Kungliga Tekniska Högskolan
|
||||||
%%% See the file COPYING
|
%%% See the file COPYING
|
||||||
%%%
|
%%%
|
||||||
|
@ -63,11 +70,20 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% External exports
|
%% External exports
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-export([start/6,
|
-export([start/8,
|
||||||
start_link/6,
|
start_link/8,
|
||||||
fetch/3,
|
fetch/3,
|
||||||
fetch/4,
|
fetch/4,
|
||||||
squery/4
|
execute/5,
|
||||||
|
execute/6,
|
||||||
|
transaction/3,
|
||||||
|
transaction/4
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% private exports to be called only from the 'mysql' module
|
||||||
|
-export([fetch_local/2,
|
||||||
|
execute_local/3,
|
||||||
|
get_pool_id/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -82,16 +98,32 @@
|
||||||
log_fun,
|
log_fun,
|
||||||
recv_pid,
|
recv_pid,
|
||||||
socket,
|
socket,
|
||||||
data
|
data,
|
||||||
|
|
||||||
|
%% maps statement names to their versions
|
||||||
|
prepares = gb_trees:empty(),
|
||||||
|
|
||||||
|
%% the id of the connection pool to which this connection belongs
|
||||||
|
pool_id
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(SECURE_CONNECTION, 32768).
|
-define(SECURE_CONNECTION, 32768).
|
||||||
-define(MYSQL_QUERY_OP, 3).
|
-define(MYSQL_QUERY_OP, 3).
|
||||||
-define(DEFAULT_STANDALONE_TIMEOUT, 5000).
|
-define(DEFAULT_STANDALONE_TIMEOUT, 5000).
|
||||||
-define(DEFAULT_RESULT_TYPE, list).
|
|
||||||
-define(MYSQL_4_0, 40). %% Support for MySQL 4.0.x
|
-define(MYSQL_4_0, 40). %% Support for MySQL 4.0.x
|
||||||
-define(MYSQL_4_1, 41). %% Support for MySQL 4.1.x et 5.0.x
|
-define(MYSQL_4_1, 41). %% Support for MySQL 4.1.x et 5.0.x
|
||||||
|
|
||||||
|
%% Used by transactions to get the state variable for this connection
|
||||||
|
%% when bypassing the dispatcher.
|
||||||
|
-define(STATE_VAR, mysql_connection_state).
|
||||||
|
|
||||||
|
-define(Log(LogFun,Level,Msg),
|
||||||
|
LogFun(?MODULE, ?LINE,Level,fun()-> {Msg,[]} end)).
|
||||||
|
-define(Log2(LogFun,Level,Msg,Params),
|
||||||
|
LogFun(?MODULE, ?LINE,Level,fun()-> {Msg,Params} end)).
|
||||||
|
-define(L(Msg), io:format("~p:~b ~p ~n", [?MODULE, ?LINE, Msg])).
|
||||||
|
|
||||||
|
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
%% External functions
|
%% External functions
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
|
@ -111,37 +143,41 @@
|
||||||
%% Pid = pid()
|
%% Pid = pid()
|
||||||
%% Reason = string()
|
%% Reason = string()
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
start(Host, Port, User, Password, Database, LogFun) when is_list(Host), is_integer(Port), is_list(User),
|
start(Host, Port, User, Password, Database, LogFun, Encoding, PoolId) ->
|
||||||
is_list(Password), is_list(Database) ->
|
|
||||||
ConnPid = self(),
|
ConnPid = self(),
|
||||||
Pid = spawn(fun () ->
|
Pid = spawn(fun () ->
|
||||||
init(Host, Port, User, Password, Database, LogFun, ConnPid)
|
init(Host, Port, User, Password, Database,
|
||||||
|
LogFun, Encoding, PoolId, ConnPid)
|
||||||
end),
|
end),
|
||||||
post_start(Pid, LogFun).
|
post_start(Pid, LogFun).
|
||||||
|
|
||||||
start_link(Host, Port, User, Password, Database, LogFun) when is_list(Host), is_integer(Port), is_list(User),
|
start_link(Host, Port, User, Password, Database, LogFun, Encoding, PoolId) ->
|
||||||
is_list(Password), is_list(Database) ->
|
|
||||||
ConnPid = self(),
|
ConnPid = self(),
|
||||||
Pid = spawn_link(fun () ->
|
Pid = spawn_link(fun () ->
|
||||||
init(Host, Port, User, Password, Database, LogFun, ConnPid)
|
init(Host, Port, User, Password, Database,
|
||||||
end),
|
LogFun, Encoding, PoolId, ConnPid)
|
||||||
|
end),
|
||||||
post_start(Pid, LogFun).
|
post_start(Pid, LogFun).
|
||||||
|
|
||||||
%% part of start/6 or start_link/6:
|
%% part of start/6 or start_link/6:
|
||||||
post_start(Pid, _LogFun) ->
|
post_start(Pid, LogFun) ->
|
||||||
%%Timeout = get_option(timeout, Options, ?DEFAULT_STANDALONE_TIMEOUT),
|
|
||||||
%%TODO find a way to get configured Options here
|
|
||||||
Timeout= ?DEFAULT_STANDALONE_TIMEOUT,
|
|
||||||
receive
|
receive
|
||||||
{mysql_conn, Pid, ok} ->
|
{mysql_conn, Pid, ok} ->
|
||||||
{ok, Pid};
|
{ok, Pid};
|
||||||
{mysql_conn, Pid, {error, Reason}} ->
|
{mysql_conn, Pid, {error, Reason}} ->
|
||||||
{error, Reason}
|
{error, Reason};
|
||||||
% Unknown ->
|
{mysql_conn, OtherPid, {error, Reason}} ->
|
||||||
% mysql:log(_LogFun, error, "mysql_conn: Received unknown signal, exiting"),
|
% Ignore error message from other processes. This handles the case
|
||||||
% mysql:log(_LogFun, debug, "mysql_conn: Unknown signal : ~p", [Unknown]),
|
% when mysql is shutdown and takes more than 5 secs to close the
|
||||||
% {error, "unknown signal received"}
|
% listener socket.
|
||||||
after Timeout ->
|
?Log2(LogFun, debug, "Ignoring message from process ~p | Reason: ~p",
|
||||||
|
[OtherPid, Reason]),
|
||||||
|
post_start(Pid, LogFun);
|
||||||
|
Unknown ->
|
||||||
|
?Log2(LogFun, error,
|
||||||
|
"received unknown signal, exiting: ~p", [Unknown]),
|
||||||
|
{error, "unknown signal received"}
|
||||||
|
after 5000 ->
|
||||||
{error, "timed out"}
|
{error, "timed out"}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -149,16 +185,21 @@ post_start(Pid, _LogFun) ->
|
||||||
%% Function: fetch(Pid, Query, From)
|
%% Function: fetch(Pid, Query, From)
|
||||||
%% fetch(Pid, Query, From, Timeout)
|
%% fetch(Pid, Query, From, Timeout)
|
||||||
%% Pid = pid(), mysql_conn to send fetch-request to
|
%% Pid = pid(), mysql_conn to send fetch-request to
|
||||||
%% Query = string(), MySQL query in verbatim
|
%% Queries = A single binary() query or a list of binary() queries.
|
||||||
|
%% If a list is provided, the return value is the return
|
||||||
|
%% of the last query, or the first query that has
|
||||||
|
%% returned an error. If an error occurs, execution of
|
||||||
|
%% the following queries is aborted.
|
||||||
%% From = pid() or term(), use a From of self() when
|
%% From = pid() or term(), use a From of self() when
|
||||||
%% using this module for a single connection,
|
%% using this module for a single connection,
|
||||||
%% or pass the gen_server:call/3 From argument if
|
%% or pass the gen_server:call/3 From argument if
|
||||||
%% using a gen_server to do the querys (e.g. the
|
%% using a gen_server to do the querys (e.g. the
|
||||||
%% mysql_dispatcher)
|
%% mysql_dispatcher)
|
||||||
%% Timeout = integer() | infinity, gen_server timeout value
|
%% Timeout = integer() | infinity, gen_server timeout value
|
||||||
%% Descrip.: Send a query and wait for the result if running stand-
|
%% Descrip.: Send a query or a list of queries and wait for the result
|
||||||
%% alone (From = self()), but don't block the caller if we
|
%% if running stand-alone (From = self()), but don't block
|
||||||
%% are not running stand-alone (From = gen_server From).
|
%% the caller if we are not running stand-alone
|
||||||
|
%% (From = gen_server From).
|
||||||
%% Returns : ok | (non-stand-alone mode)
|
%% Returns : ok | (non-stand-alone mode)
|
||||||
%% {data, #mysql_result} | (stand-alone mode)
|
%% {data, #mysql_result} | (stand-alone mode)
|
||||||
%% {updated, #mysql_result} | (stand-alone mode)
|
%% {updated, #mysql_result} | (stand-alone mode)
|
||||||
|
@ -167,28 +208,41 @@ post_start(Pid, _LogFun) ->
|
||||||
%% Rows = list() of [string()]
|
%% Rows = list() of [string()]
|
||||||
%% Reason = term()
|
%% Reason = term()
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
fetch(Pid, Queries, From) ->
|
||||||
|
fetch(Pid, Queries, From, ?DEFAULT_STANDALONE_TIMEOUT).
|
||||||
|
|
||||||
fetch(Pid, Query, From) ->
|
fetch(Pid, Queries, From, Timeout) ->
|
||||||
squery(Pid, Query, From, []).
|
do_fetch(Pid, Queries, From, Timeout).
|
||||||
fetch(Pid, Query, From, Timeout) ->
|
|
||||||
squery(Pid, Query, From, [{timeout, Timeout}]).
|
|
||||||
|
|
||||||
squery(Pid, Query, From, Options) when is_pid(Pid), is_list(Query) ->
|
execute(Pid, Name, Version, Params, From) ->
|
||||||
Self = self(),
|
execute(Pid, Name, Version, Params, From, ?DEFAULT_STANDALONE_TIMEOUT).
|
||||||
Timeout = get_option(timeout, Options, ?DEFAULT_STANDALONE_TIMEOUT),
|
|
||||||
Pid ! {fetch, Query, From, Options},
|
execute(Pid, Name, Version, Params, From, Timeout) ->
|
||||||
case From of
|
send_msg(Pid, {execute, Name, Version, Params, From}, From, Timeout).
|
||||||
Self ->
|
|
||||||
%% We are not using a mysql_dispatcher, await the response
|
transaction(Pid, Fun, From) ->
|
||||||
receive
|
transaction(Pid, Fun, From, ?DEFAULT_STANDALONE_TIMEOUT).
|
||||||
{fetch_result, Pid, Result} ->
|
|
||||||
Result
|
transaction(Pid, Fun, From, Timeout) ->
|
||||||
after Timeout ->
|
send_msg(Pid, {transaction, Fun, From}, From, Timeout).
|
||||||
{error, "query timed out"}
|
|
||||||
end;
|
get_pool_id(State) ->
|
||||||
_ ->
|
State#state.pool_id.
|
||||||
%% From is gen_server From, Pid will do gen_server:reply() when it has an answer
|
|
||||||
ok
|
%%====================================================================
|
||||||
|
%% Internal functions
|
||||||
|
%%====================================================================
|
||||||
|
|
||||||
|
fetch_local(State, Query) ->
|
||||||
|
do_query(State, Query).
|
||||||
|
|
||||||
|
execute_local(State, Name, Params) ->
|
||||||
|
case do_execute(State, Name, Params, undefined) of
|
||||||
|
{ok, Res, State1} ->
|
||||||
|
put(?STATE_VAR, State1),
|
||||||
|
Res;
|
||||||
|
Err ->
|
||||||
|
Err
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -205,28 +259,47 @@ squery(Pid, Query, From, Options) when is_pid(Pid), is_list(Query) ->
|
||||||
%%
|
%%
|
||||||
%% Note : Only to be used externally by the 'mysql_auth' module.
|
%% Note : Only to be used externally by the 'mysql_auth' module.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
do_recv(LogFun, RecvPid, SeqNum) when is_function(LogFun); LogFun == undefined, SeqNum == undefined ->
|
do_recv(LogFun, RecvPid, SeqNum) when is_function(LogFun);
|
||||||
|
LogFun == undefined,
|
||||||
|
SeqNum == undefined ->
|
||||||
receive
|
receive
|
||||||
{mysql_recv, RecvPid, data, Packet, Num} ->
|
{mysql_recv, RecvPid, data, Packet, Num} ->
|
||||||
%%mysql:log(LogFun, debug, "mysql_conn: recv packet ~p: ~p", [Num, Packet]),
|
|
||||||
{ok, Packet, Num};
|
{ok, Packet, Num};
|
||||||
{mysql_recv, RecvPid, closed, _E} ->
|
{mysql_recv, RecvPid, closed, _E} ->
|
||||||
{error, "mysql_recv: socket was closed"}
|
{error, "mysql_recv: socket was closed"}
|
||||||
end;
|
end;
|
||||||
do_recv(LogFun, RecvPid, SeqNum) when is_function(LogFun); LogFun == undefined, is_integer(SeqNum) ->
|
do_recv(LogFun, RecvPid, SeqNum) when is_function(LogFun);
|
||||||
|
LogFun == undefined,
|
||||||
|
is_integer(SeqNum) ->
|
||||||
ResponseNum = SeqNum + 1,
|
ResponseNum = SeqNum + 1,
|
||||||
receive
|
receive
|
||||||
{mysql_recv, RecvPid, data, Packet, ResponseNum} ->
|
{mysql_recv, RecvPid, data, Packet, ResponseNum} ->
|
||||||
%%mysql:log(LogFun, debug, "mysql_conn: recv packet ~p: ~p", [ResponseNum, Packet]),
|
|
||||||
{ok, Packet, ResponseNum};
|
{ok, Packet, ResponseNum};
|
||||||
{mysql_recv, RecvPid, closed, _E} ->
|
{mysql_recv, RecvPid, closed, _E} ->
|
||||||
{error, "mysql_recv: socket was closed"}
|
{error, "mysql_recv: socket was closed"}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
do_fetch(Pid, Queries, From, Timeout) ->
|
||||||
|
send_msg(Pid, {fetch, Queries, From}, From, Timeout).
|
||||||
|
|
||||||
|
send_msg(Pid, Msg, From, Timeout) ->
|
||||||
|
Self = self(),
|
||||||
|
Pid ! Msg,
|
||||||
|
case From of
|
||||||
|
Self ->
|
||||||
|
%% We are not using a mysql_dispatcher, await the response
|
||||||
|
receive
|
||||||
|
{fetch_result, Pid, Result} ->
|
||||||
|
Result
|
||||||
|
after Timeout ->
|
||||||
|
{error, "message timed out"}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
%% From is gen_server From,
|
||||||
|
%% Pid will do gen_server:reply() when it has an answer
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
%%====================================================================
|
|
||||||
%% Internal functions
|
|
||||||
%%====================================================================
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Function: init(Host, Port, User, Password, Database, LogFun,
|
%% Function: init(Host, Port, User, Password, Database, LogFun,
|
||||||
|
@ -236,30 +309,47 @@ do_recv(LogFun, RecvPid, SeqNum) when is_function(LogFun); LogFun == undefined,
|
||||||
%% User = string()
|
%% User = string()
|
||||||
%% Password = string()
|
%% Password = string()
|
||||||
%% Database = string()
|
%% Database = string()
|
||||||
%% LogFun = undefined | function() of arity 3
|
%% LogFun = function() of arity 4
|
||||||
%% Parent = pid() of process starting this mysql_conn
|
%% Parent = pid() of process starting this mysql_conn
|
||||||
%% Descrip.: Connect to a MySQL server, log in and chooses a database.
|
%% Descrip.: Connect to a MySQL server, log in and chooses a database.
|
||||||
%% Report result of this to Parent, and then enter loop() if
|
%% Report result of this to Parent, and then enter loop() if
|
||||||
%% we were successfull.
|
%% we were successfull.
|
||||||
%% Returns : void() | does not return
|
%% Returns : void() | does not return
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
init(Host, Port, User, Password, Database, LogFun, Parent) ->
|
init(Host, Port, User, Password, Database, LogFun, Encoding, PoolId, Parent) ->
|
||||||
case mysql_recv:start_link(Host, Port, LogFun, self()) of
|
case mysql_recv:start_link(Host, Port, LogFun, self()) of
|
||||||
{ok, RecvPid, Sock} ->
|
{ok, RecvPid, Sock} ->
|
||||||
case mysql_init(Sock, RecvPid, User, Password, LogFun) of
|
case mysql_init(Sock, RecvPid, User, Password, LogFun) of
|
||||||
{ok, Version} ->
|
{ok, Version} ->
|
||||||
case do_query(Sock, RecvPid, LogFun, "use " ++ Database, Version, [{result_type, binary}]) of
|
Db = iolist_to_binary(Database),
|
||||||
|
case do_query(Sock, RecvPid, LogFun,
|
||||||
|
<<"use ", Db/binary>>,
|
||||||
|
Version) of
|
||||||
{error, MySQLRes} ->
|
{error, MySQLRes} ->
|
||||||
mysql:log(LogFun, error, "mysql_conn: Failed changing to database ~p : ~p",
|
?Log2(LogFun, error,
|
||||||
[Database, mysql:get_result_reason(MySQLRes)]),
|
"mysql_conn: Failed changing to database "
|
||||||
Parent ! {mysql_conn, self(), {error, failed_changing_database}};
|
"~p : ~p",
|
||||||
|
[Database,
|
||||||
|
mysql:get_result_reason(MySQLRes)]),
|
||||||
|
Parent ! {mysql_conn, self(),
|
||||||
|
{error, failed_changing_database}};
|
||||||
|
|
||||||
%% ResultType: data | updated
|
%% ResultType: data | updated
|
||||||
{_ResultType, _MySQLRes} ->
|
{_ResultType, _MySQLRes} ->
|
||||||
Parent ! {mysql_conn, self(), ok},
|
Parent ! {mysql_conn, self(), ok},
|
||||||
|
case Encoding of
|
||||||
|
undefined -> undefined;
|
||||||
|
_ ->
|
||||||
|
EncodingBinary = list_to_binary(atom_to_list(Encoding)),
|
||||||
|
do_query(Sock, RecvPid, LogFun,
|
||||||
|
<<"set names '", EncodingBinary/binary, "'">>,
|
||||||
|
Version)
|
||||||
|
end,
|
||||||
State = #state{mysql_version=Version,
|
State = #state{mysql_version=Version,
|
||||||
recv_pid = RecvPid,
|
recv_pid = RecvPid,
|
||||||
socket = Sock,
|
socket = Sock,
|
||||||
log_fun = LogFun,
|
log_fun = LogFun,
|
||||||
|
pool_id = PoolId,
|
||||||
data = <<>>
|
data = <<>>
|
||||||
},
|
},
|
||||||
loop(State)
|
loop(State)
|
||||||
|
@ -268,8 +358,9 @@ init(Host, Port, User, Password, Database, LogFun, Parent) ->
|
||||||
Parent ! {mysql_conn, self(), {error, login_failed}}
|
Parent ! {mysql_conn, self(), {error, login_failed}}
|
||||||
end;
|
end;
|
||||||
E ->
|
E ->
|
||||||
mysql:log(LogFun, error, "mysql_conn: Failed connecting to ~p:~p : ~p",
|
?Log2(LogFun, error,
|
||||||
[Host, Port, E]),
|
"failed connecting to ~p:~p : ~p",
|
||||||
|
[Host, Port, E]),
|
||||||
Parent ! {mysql_conn, self(), {error, connect_failed}}
|
Parent ! {mysql_conn, self(), {error, connect_failed}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -282,31 +373,201 @@ init(Host, Port, User, Password, Database, LogFun, Parent) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
loop(State) ->
|
loop(State) ->
|
||||||
RecvPid = State#state.recv_pid,
|
RecvPid = State#state.recv_pid,
|
||||||
|
LogFun = State#state.log_fun,
|
||||||
receive
|
receive
|
||||||
{fetch, Query, GenSrvFrom, Options} ->
|
{fetch, Queries, From} ->
|
||||||
%% GenSrvFrom is either a gen_server:call/3 From term(), or a pid if no
|
send_reply(From, do_queries(State, Queries)),
|
||||||
%% gen_server was used to make the query
|
|
||||||
Res = do_query(State, Query, Options),
|
|
||||||
case is_pid(GenSrvFrom) of
|
|
||||||
true ->
|
|
||||||
%% The query was not sent using gen_server mechanisms
|
|
||||||
GenSrvFrom ! {fetch_result, self(), Res};
|
|
||||||
false ->
|
|
||||||
gen_server:reply(GenSrvFrom, Res)
|
|
||||||
end,
|
|
||||||
loop(State);
|
loop(State);
|
||||||
|
{transaction, Fun, From} ->
|
||||||
|
put(?STATE_VAR, State),
|
||||||
|
|
||||||
|
Res = do_transaction(State, Fun),
|
||||||
|
|
||||||
|
%% The transaction may have changed the state of this process
|
||||||
|
%% if it has executed prepared statements. This would happen in
|
||||||
|
%% mysql:execute.
|
||||||
|
State1 = get(?STATE_VAR),
|
||||||
|
|
||||||
|
send_reply(From, Res),
|
||||||
|
loop(State1);
|
||||||
|
{execute, Name, Version, Params, From} ->
|
||||||
|
State1 =
|
||||||
|
case do_execute(State, Name, Params, Version) of
|
||||||
|
{error, _} = Err ->
|
||||||
|
send_reply(From, Err),
|
||||||
|
State;
|
||||||
|
{ok, Result, NewState} ->
|
||||||
|
send_reply(From, Result),
|
||||||
|
NewState
|
||||||
|
end,
|
||||||
|
loop(State1);
|
||||||
{mysql_recv, RecvPid, data, Packet, Num} ->
|
{mysql_recv, RecvPid, data, Packet, Num} ->
|
||||||
mysql:log(State#state.log_fun, error, "mysql_conn: Received MySQL data when not expecting any "
|
?Log2(LogFun, error,
|
||||||
"(num ~p) - ignoring it", [Num]),
|
"received data when not expecting any -- "
|
||||||
mysql:log(State#state.log_fun, error, "mysql_conn: Unexpected MySQL data (num ~p) :~n~p",
|
"ignoring it: {~p, ~p}", [Num, Packet]),
|
||||||
[Num, Packet]),
|
|
||||||
loop(State);
|
loop(State);
|
||||||
Unknown ->
|
Unknown ->
|
||||||
mysql:log(State#state.log_fun, error, "mysql_conn: Received unknown signal, exiting"),
|
?Log2(LogFun, error,
|
||||||
mysql:log(State#state.log_fun, debug, "mysql_conn: Unknown signal : ~p", [Unknown]),
|
"received unknown signal, exiting: ~p", [Unknown]),
|
||||||
error
|
error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% GenSrvFrom is either a gen_server:call/3 From term(),
|
||||||
|
%% or a pid if no gen_server was used to make the query
|
||||||
|
send_reply(GenSrvFrom, Res) when is_pid(GenSrvFrom) ->
|
||||||
|
%% The query was not sent using gen_server mechanisms
|
||||||
|
GenSrvFrom ! {fetch_result, self(), Res};
|
||||||
|
send_reply(GenSrvFrom, Res) ->
|
||||||
|
gen_server:reply(GenSrvFrom, Res).
|
||||||
|
|
||||||
|
do_query(State, Query) ->
|
||||||
|
do_query(State#state.socket,
|
||||||
|
State#state.recv_pid,
|
||||||
|
State#state.log_fun,
|
||||||
|
Query,
|
||||||
|
State#state.mysql_version
|
||||||
|
).
|
||||||
|
|
||||||
|
do_query(Sock, RecvPid, LogFun, Query, Version) ->
|
||||||
|
Query1 = iolist_to_binary(Query),
|
||||||
|
?Log2(LogFun, debug, "fetch ~p (id ~p)", [Query1,RecvPid]),
|
||||||
|
Packet = <<?MYSQL_QUERY_OP, Query1/binary>>,
|
||||||
|
case do_send(Sock, Packet, 0, LogFun) of
|
||||||
|
ok ->
|
||||||
|
get_query_response(LogFun,RecvPid,
|
||||||
|
Version);
|
||||||
|
{error, Reason} ->
|
||||||
|
Msg = io_lib:format("Failed sending data "
|
||||||
|
"on socket : ~p",
|
||||||
|
[Reason]),
|
||||||
|
{error, Msg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_queries(State, Queries) when not is_list(Queries) ->
|
||||||
|
do_query(State, Queries);
|
||||||
|
do_queries(State, Queries) ->
|
||||||
|
do_queries(State#state.socket,
|
||||||
|
State#state.recv_pid,
|
||||||
|
State#state.log_fun,
|
||||||
|
Queries,
|
||||||
|
State#state.mysql_version
|
||||||
|
).
|
||||||
|
|
||||||
|
%% Execute a list of queries, returning the response for the last query.
|
||||||
|
%% If a query returns an error before the last query is executed, the
|
||||||
|
%% loop is aborted and the error is returned.
|
||||||
|
do_queries(Sock, RecvPid, LogFun, Queries, Version) ->
|
||||||
|
catch
|
||||||
|
lists:foldl(
|
||||||
|
fun(Query, _LastResponse) ->
|
||||||
|
case do_query(Sock, RecvPid, LogFun, Query, Version) of
|
||||||
|
{error, _} = Err -> throw(Err);
|
||||||
|
Res -> Res
|
||||||
|
end
|
||||||
|
end, ok, Queries).
|
||||||
|
|
||||||
|
do_transaction(State, Fun) ->
|
||||||
|
case do_query(State, <<"BEGIN">>) of
|
||||||
|
{error, _} = Err ->
|
||||||
|
{aborted, Err};
|
||||||
|
_ ->
|
||||||
|
case catch Fun() of
|
||||||
|
error = Err -> rollback(State, Err);
|
||||||
|
{error, _} = Err -> rollback(State, Err);
|
||||||
|
{'EXIT', _} = Err -> rollback(State, Err);
|
||||||
|
Res ->
|
||||||
|
case do_query(State, <<"COMMIT">>) of
|
||||||
|
{error, _} = Err ->
|
||||||
|
rollback(State, {commit_error, Err});
|
||||||
|
_ ->
|
||||||
|
case Res of
|
||||||
|
{atomic, _} -> Res;
|
||||||
|
_ -> {atomic, Res}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
rollback(State, Err) ->
|
||||||
|
Res = do_query(State, <<"ROLLBACK">>),
|
||||||
|
{aborted, {Err, {rollback_result, Res}}}.
|
||||||
|
|
||||||
|
do_execute(State, Name, Params, ExpectedVersion) ->
|
||||||
|
Res = case gb_trees:lookup(Name, State#state.prepares) of
|
||||||
|
{value, Version} when Version == ExpectedVersion ->
|
||||||
|
{ok, latest};
|
||||||
|
{value, Version} ->
|
||||||
|
mysql:get_prepared(Name, Version);
|
||||||
|
none ->
|
||||||
|
mysql:get_prepared(Name)
|
||||||
|
end,
|
||||||
|
case Res of
|
||||||
|
{ok, latest} ->
|
||||||
|
{ok, do_execute1(State, Name, Params), State};
|
||||||
|
{ok, {Stmt, NewVersion}} ->
|
||||||
|
prepare_and_exec(State, Name, NewVersion, Stmt, Params);
|
||||||
|
{error, _} = Err ->
|
||||||
|
Err
|
||||||
|
end.
|
||||||
|
|
||||||
|
prepare_and_exec(State, Name, Version, Stmt, Params) ->
|
||||||
|
NameBin = atom_to_binary(Name),
|
||||||
|
StmtBin = <<"PREPARE ", NameBin/binary, " FROM '",
|
||||||
|
Stmt/binary, "'">>,
|
||||||
|
case do_query(State, StmtBin) of
|
||||||
|
{updated, _} ->
|
||||||
|
State1 =
|
||||||
|
State#state{
|
||||||
|
prepares = gb_trees:enter(Name, Version,
|
||||||
|
State#state.prepares)},
|
||||||
|
{ok, do_execute1(State1, Name, Params), State1};
|
||||||
|
{error, _} = Err ->
|
||||||
|
Err;
|
||||||
|
Other ->
|
||||||
|
{error, {unexpected_result, Other}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_execute1(State, Name, Params) ->
|
||||||
|
Stmts = make_statements_for_execute(Name, Params),
|
||||||
|
do_queries(State, Stmts).
|
||||||
|
|
||||||
|
make_statements_for_execute(Name, []) ->
|
||||||
|
NameBin = atom_to_binary(Name),
|
||||||
|
[<<"EXECUTE ", NameBin/binary>>];
|
||||||
|
make_statements_for_execute(Name, Params) ->
|
||||||
|
NumParams = length(Params),
|
||||||
|
ParamNums = lists:seq(1, NumParams),
|
||||||
|
|
||||||
|
NameBin = atom_to_binary(Name),
|
||||||
|
|
||||||
|
ParamNames =
|
||||||
|
lists:foldl(
|
||||||
|
fun(Num, Acc) ->
|
||||||
|
ParamName = [$@ | integer_to_list(Num)],
|
||||||
|
if Num == 1 ->
|
||||||
|
ParamName ++ Acc;
|
||||||
|
true ->
|
||||||
|
[$, | ParamName] ++ Acc
|
||||||
|
end
|
||||||
|
end, [], lists:reverse(ParamNums)),
|
||||||
|
ParamNamesBin = list_to_binary(ParamNames),
|
||||||
|
|
||||||
|
ExecStmt = <<"EXECUTE ", NameBin/binary, " USING ",
|
||||||
|
ParamNamesBin/binary>>,
|
||||||
|
|
||||||
|
ParamVals = lists:zip(ParamNums, Params),
|
||||||
|
Stmts = lists:foldl(
|
||||||
|
fun({Num, Val}, Acc) ->
|
||||||
|
NumBin = mysql:encode(Num, true),
|
||||||
|
ValBin = mysql:encode(Val, true),
|
||||||
|
[<<"SET @", NumBin/binary, "=", ValBin/binary>> | Acc]
|
||||||
|
end, [ExecStmt], lists:reverse(ParamVals)),
|
||||||
|
Stmts.
|
||||||
|
|
||||||
|
atom_to_binary(Val) ->
|
||||||
|
<<_:4/binary, Bin/binary>> = term_to_binary(Val),
|
||||||
|
Bin.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Function: mysql_init(Sock, RecvPid, User, Password, LogFun)
|
%% Function: mysql_init(Sock, RecvPid, User, Password, LogFun)
|
||||||
%% Sock = term(), gen_tcp socket
|
%% Sock = term(), gen_tcp socket
|
||||||
|
@ -325,21 +586,29 @@ mysql_init(Sock, RecvPid, User, Password, LogFun) ->
|
||||||
AuthRes =
|
AuthRes =
|
||||||
case Caps band ?SECURE_CONNECTION of
|
case Caps band ?SECURE_CONNECTION of
|
||||||
?SECURE_CONNECTION ->
|
?SECURE_CONNECTION ->
|
||||||
mysql_auth:do_new_auth(Sock, RecvPid, InitSeqNum + 1, User, Password, Salt1, Salt2, LogFun);
|
mysql_auth:do_new_auth(
|
||||||
|
Sock, RecvPid, InitSeqNum + 1,
|
||||||
|
User, Password, Salt1, Salt2, LogFun);
|
||||||
_ ->
|
_ ->
|
||||||
mysql_auth:do_old_auth(Sock, RecvPid, InitSeqNum + 1, User, Password, Salt1, LogFun)
|
mysql_auth:do_old_auth(
|
||||||
|
Sock, RecvPid, InitSeqNum + 1, User, Password,
|
||||||
|
Salt1, LogFun)
|
||||||
end,
|
end,
|
||||||
case AuthRes of
|
case AuthRes of
|
||||||
{ok, <<0:8, _Rest/binary>>, _RecvNum} ->
|
{ok, <<0:8, _Rest/binary>>, _RecvNum} ->
|
||||||
{ok,Version};
|
{ok,Version};
|
||||||
{ok, <<255:8, Code:16/little, Message/binary>>, _RecvNum} ->
|
{ok, <<255:8, Code:16/little, Message/binary>>, _RecvNum} ->
|
||||||
mysql:log(LogFun, error, "mysql_conn: init error ~p: ~p~n", [Code, binary_to_list(Message)]),
|
?Log2(LogFun, error, "init error ~p: ~p",
|
||||||
|
[Code, binary_to_list(Message)]),
|
||||||
{error, binary_to_list(Message)};
|
{error, binary_to_list(Message)};
|
||||||
{ok, RecvPacket, _RecvNum} ->
|
{ok, RecvPacket, _RecvNum} ->
|
||||||
mysql:log(LogFun, error, "mysql_conn: init unknown error ~p~n", [binary_to_list(RecvPacket)]),
|
?Log2(LogFun, error,
|
||||||
|
"init unknown error ~p",
|
||||||
|
[binary_to_list(RecvPacket)]),
|
||||||
{error, binary_to_list(RecvPacket)};
|
{error, binary_to_list(RecvPacket)};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
mysql:log(LogFun, error, "mysql_conn: init failed receiving data : ~p~n", [Reason]),
|
?Log2(LogFun, error,
|
||||||
|
"init failed receiving data : ~p", [Reason]),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -355,8 +624,10 @@ greeting(Packet, LogFun) ->
|
||||||
<<Caps:16/little, Rest5/binary>> = Rest4,
|
<<Caps:16/little, Rest5/binary>> = Rest4,
|
||||||
<<ServerChar:16/binary-unit:8, Rest6/binary>> = Rest5,
|
<<ServerChar:16/binary-unit:8, Rest6/binary>> = Rest5,
|
||||||
{Salt2, _Rest7} = asciz(Rest6),
|
{Salt2, _Rest7} = asciz(Rest6),
|
||||||
mysql:log(LogFun, debug, "mysql_conn: greeting version ~p (protocol ~p) salt ~p caps ~p serverchar ~p salt2 ~p",
|
?Log2(LogFun, debug,
|
||||||
[Version, Protocol, Salt, Caps, ServerChar, Salt2]),
|
"greeting version ~p (protocol ~p) salt ~p caps ~p serverchar ~p"
|
||||||
|
"salt2 ~p",
|
||||||
|
[Version, Protocol, Salt, Caps, ServerChar, Salt2]),
|
||||||
{normalize_version(Version, LogFun), Salt, Salt2, Caps}.
|
{normalize_version(Version, LogFun), Salt, Salt2, Caps}.
|
||||||
|
|
||||||
%% part of greeting/2
|
%% part of greeting/2
|
||||||
|
@ -382,7 +653,7 @@ asciz(Data) when list(Data) ->
|
||||||
%% AffectedRows = int()
|
%% AffectedRows = int()
|
||||||
%% Reason = term()
|
%% Reason = term()
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
get_query_response(LogFun, RecvPid, Version, Options) ->
|
get_query_response(LogFun, RecvPid, Version) ->
|
||||||
case do_recv(LogFun, RecvPid, undefined) of
|
case do_recv(LogFun, RecvPid, undefined) of
|
||||||
{ok, <<Fieldcount:8, Rest/binary>>, _} ->
|
{ok, <<Fieldcount:8, Rest/binary>>, _} ->
|
||||||
case Fieldcount of
|
case Fieldcount of
|
||||||
|
@ -392,15 +663,15 @@ get_query_response(LogFun, RecvPid, Version, Options) ->
|
||||||
{updated, #mysql_result{affectedrows=AffectedRows}};
|
{updated, #mysql_result{affectedrows=AffectedRows}};
|
||||||
255 ->
|
255 ->
|
||||||
<<_Code:16/little, Message/binary>> = Rest,
|
<<_Code:16/little, Message/binary>> = Rest,
|
||||||
{error, #mysql_result{error=binary_to_list(Message)}};
|
{error, #mysql_result{error=Message}};
|
||||||
_ ->
|
_ ->
|
||||||
%% Tabular data received
|
%% Tabular data received
|
||||||
case get_fields(LogFun, RecvPid, [], Version) of
|
case get_fields(LogFun, RecvPid, [], Version) of
|
||||||
{ok, Fields} ->
|
{ok, Fields} ->
|
||||||
ResultType = get_option(result_type, Options, ?DEFAULT_RESULT_TYPE),
|
case get_rows(Fields, LogFun, RecvPid, []) of
|
||||||
case get_rows(Fieldcount, LogFun, RecvPid, ResultType, []) of
|
|
||||||
{ok, Rows} ->
|
{ok, Rows} ->
|
||||||
{data, #mysql_result{fieldinfo=Fields, rows=Rows}};
|
{data, #mysql_result{fieldinfo=Fields,
|
||||||
|
rows=Rows}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, #mysql_result{error=Reason}}
|
{error, #mysql_result{error=Reason}}
|
||||||
end;
|
end;
|
||||||
|
@ -440,13 +711,13 @@ get_fields(LogFun, RecvPid, Res, ?MYSQL_4_0) ->
|
||||||
<<Length:LengthL/little>> = LengthB,
|
<<Length:LengthL/little>> = LengthB,
|
||||||
{Type, Rest4} = get_with_length(Rest3),
|
{Type, Rest4} = get_with_length(Rest3),
|
||||||
{_Flags, _Rest5} = get_with_length(Rest4),
|
{_Flags, _Rest5} = get_with_length(Rest4),
|
||||||
This = {binary_to_list(Table),
|
This = {Table,
|
||||||
binary_to_list(Field),
|
Field,
|
||||||
Length,
|
Length,
|
||||||
%% TODO: Check on MySQL 4.0 if types are specified
|
%% TODO: Check on MySQL 4.0 if types are specified
|
||||||
%% using the same 4.1 formalism and could
|
%% using the same 4.1 formalism and could
|
||||||
%% be expanded to atoms:
|
%% be expanded to atoms:
|
||||||
binary_to_list(Type)},
|
Type},
|
||||||
get_fields(LogFun, RecvPid, [This | Res], ?MYSQL_4_0)
|
get_fields(LogFun, RecvPid, [This | Res], ?MYSQL_4_0)
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -475,9 +746,9 @@ get_fields(LogFun, RecvPid, Res, ?MYSQL_4_1) ->
|
||||||
Length:32/little, Type:8/little,
|
Length:32/little, Type:8/little,
|
||||||
_Flags:16/little, _Decimals:8/little,
|
_Flags:16/little, _Decimals:8/little,
|
||||||
_Rest7/binary>> = Rest6,
|
_Rest7/binary>> = Rest6,
|
||||||
|
|
||||||
This = {binary_to_list(Table),
|
This = {Table,
|
||||||
binary_to_list(Field),
|
Field,
|
||||||
Length,
|
Length,
|
||||||
get_field_datatype(Type)},
|
get_field_datatype(Type)},
|
||||||
get_fields(LogFun, RecvPid, [This | Res], ?MYSQL_4_1)
|
get_fields(LogFun, RecvPid, [This | Res], ?MYSQL_4_1)
|
||||||
|
@ -496,38 +767,32 @@ get_fields(LogFun, RecvPid, Res, ?MYSQL_4_1) ->
|
||||||
%% {error, Reason}
|
%% {error, Reason}
|
||||||
%% Rows = list() of [string()]
|
%% Rows = list() of [string()]
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
get_rows(N, LogFun, RecvPid, ResultType, Res) ->
|
get_rows(Fields, LogFun, RecvPid, Res) ->
|
||||||
case do_recv(LogFun, RecvPid, undefined) of
|
case do_recv(LogFun, RecvPid, undefined) of
|
||||||
{ok, Packet, _Num} ->
|
{ok, Packet, _Num} ->
|
||||||
case Packet of
|
case Packet of
|
||||||
<<254:8, Rest/binary>> when size(Rest) < 8 ->
|
<<254:8, Rest/binary>> when size(Rest) < 8 ->
|
||||||
{ok, lists:reverse(Res)};
|
{ok, lists:reverse(Res)};
|
||||||
_ ->
|
_ ->
|
||||||
{ok, This} = get_row(N, Packet, ResultType, []),
|
{ok, This} = get_row(Fields, Packet, []),
|
||||||
get_rows(N, LogFun, RecvPid, ResultType, [This | Res])
|
get_rows(Fields, LogFun, RecvPid, [This | Res])
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
%% part of get_rows/4
|
%% part of get_rows/4
|
||||||
get_row(0, _Data, _ResultType, Res) ->
|
get_row([], _Data, Res) ->
|
||||||
{ok, lists:reverse(Res)};
|
{ok, lists:reverse(Res)};
|
||||||
get_row(N, Data, ResultType, Res) ->
|
get_row([Field | OtherFields], Data, Res) ->
|
||||||
{Col, Rest} = get_with_length(Data),
|
{Col, Rest} = get_with_length(Data),
|
||||||
This = case Col of
|
This = case Col of
|
||||||
null ->
|
null ->
|
||||||
null;
|
undefined;
|
||||||
_ ->
|
_ ->
|
||||||
if
|
convert_type(Col, element(4, Field))
|
||||||
ResultType == list ->
|
|
||||||
binary_to_list(Col);
|
|
||||||
ResultType == binary ->
|
|
||||||
Col
|
|
||||||
end
|
|
||||||
end,
|
end,
|
||||||
get_row(N - 1, Rest, ResultType, [This | Res]).
|
get_row(OtherFields, Rest, [This | Res]).
|
||||||
|
|
||||||
get_with_length(<<251:8, Rest/binary>>) ->
|
get_with_length(<<251:8, Rest/binary>>) ->
|
||||||
{null, Rest};
|
{null, Rest};
|
||||||
|
@ -540,35 +805,6 @@ get_with_length(<<254:8, Length:64/little, Rest/binary>>) ->
|
||||||
get_with_length(<<Length:8, Rest/binary>>) when Length < 251 ->
|
get_with_length(<<Length:8, Rest/binary>>) when Length < 251 ->
|
||||||
split_binary(Rest, Length).
|
split_binary(Rest, Length).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Function: do_query(State, Query)
|
|
||||||
%% do_query(Sock, RecvPid, LogFun, Query)
|
|
||||||
%% Sock = term(), gen_tcp socket
|
|
||||||
%% RecvPid = pid(), mysql_recv process
|
|
||||||
%% LogFun = undefined | function() with arity 3
|
|
||||||
%% Query = string()
|
|
||||||
%% Descrip.: Send a MySQL query and block awaiting it's response.
|
|
||||||
%% Returns : result of get_query_response/2 | {error, Reason}
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
do_query(State, Query, Options) when is_record(State, state) ->
|
|
||||||
do_query(State#state.socket,
|
|
||||||
State#state.recv_pid,
|
|
||||||
State#state.log_fun,
|
|
||||||
Query,
|
|
||||||
State#state.mysql_version,
|
|
||||||
Options
|
|
||||||
).
|
|
||||||
|
|
||||||
do_query(Sock, RecvPid, LogFun, Query, Version, Options) when is_pid(RecvPid),
|
|
||||||
is_list(Query) ->
|
|
||||||
Packet = list_to_binary([?MYSQL_QUERY_OP, Query]),
|
|
||||||
case do_send(Sock, Packet, 0, LogFun) of
|
|
||||||
ok ->
|
|
||||||
get_query_response(LogFun, RecvPid, Version, Options);
|
|
||||||
{error, Reason} ->
|
|
||||||
Msg = io_lib:format("Failed sending data on socket : ~p", [Reason]),
|
|
||||||
{error, Msg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Function: do_send(Sock, Packet, SeqNum, LogFun)
|
%% Function: do_send(Sock, Packet, SeqNum, LogFun)
|
||||||
|
@ -581,7 +817,6 @@ do_query(Sock, RecvPid, LogFun, Query, Version, Options) when is_pid(RecvPid),
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
do_send(Sock, Packet, SeqNum, _LogFun) when is_binary(Packet), is_integer(SeqNum) ->
|
do_send(Sock, Packet, SeqNum, _LogFun) when is_binary(Packet), is_integer(SeqNum) ->
|
||||||
Data = <<(size(Packet)):24/little, SeqNum:8, Packet/binary>>,
|
Data = <<(size(Packet)):24/little, SeqNum:8, Packet/binary>>,
|
||||||
%%mysql:log(LogFun, debug, "mysql_conn: send packet ~p: ~p", [SeqNum, Data]),
|
|
||||||
gen_tcp:send(Sock, Data).
|
gen_tcp:send(Sock, Data).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -593,15 +828,16 @@ do_send(Sock, Packet, SeqNum, _LogFun) when is_binary(Packet), is_integer(SeqNum
|
||||||
%% Returns : Version = string()
|
%% Returns : Version = string()
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
normalize_version([$4,$.,$0|_T], LogFun) ->
|
normalize_version([$4,$.,$0|_T], LogFun) ->
|
||||||
mysql:log(LogFun, debug, "Switching to MySQL 4.0.x protocol.~n"),
|
?Log(LogFun, debug, "switching to MySQL 4.0.x protocol."),
|
||||||
?MYSQL_4_0;
|
?MYSQL_4_0;
|
||||||
normalize_version([$4,$.,$1|_T], _LogFun) ->
|
normalize_version([$4,$.,$1|_T], _LogFun) ->
|
||||||
?MYSQL_4_1;
|
?MYSQL_4_1;
|
||||||
normalize_version([$5|_T], _LogFun) ->
|
normalize_version([$5|_T], _LogFun) ->
|
||||||
%% MySQL version 5.x protocol is compliant with MySQL 4.1.x:
|
%% MySQL version 5.x protocol is compliant with MySQL 4.1.x:
|
||||||
?MYSQL_4_1;
|
?MYSQL_4_1;
|
||||||
normalize_version(_Other, LogFun) ->
|
normalize_version(_Other, LogFun) ->
|
||||||
mysql:log(LogFun, error, "MySQL version not supported: MySQL Erlang module might not work correctly.~n"),
|
?Log(LogFun, error, "MySQL version not supported: MySQL Erlang module "
|
||||||
|
"might not work correctly."),
|
||||||
%% Error, but trying the oldest protocol anyway:
|
%% Error, but trying the oldest protocol anyway:
|
||||||
?MYSQL_4_0.
|
?MYSQL_4_0.
|
||||||
|
|
||||||
|
@ -626,8 +862,7 @@ get_field_datatype(11) -> 'TIME';
|
||||||
get_field_datatype(12) -> 'DATETIME';
|
get_field_datatype(12) -> 'DATETIME';
|
||||||
get_field_datatype(13) -> 'YEAR';
|
get_field_datatype(13) -> 'YEAR';
|
||||||
get_field_datatype(14) -> 'NEWDATE';
|
get_field_datatype(14) -> 'NEWDATE';
|
||||||
get_field_datatype(16) -> 'BIT';
|
get_field_datatype(246) -> 'NEWDECIMAL';
|
||||||
get_field_datatype(246) -> 'DECIMAL';
|
|
||||||
get_field_datatype(247) -> 'ENUM';
|
get_field_datatype(247) -> 'ENUM';
|
||||||
get_field_datatype(248) -> 'SET';
|
get_field_datatype(248) -> 'SET';
|
||||||
get_field_datatype(249) -> 'TINYBLOB';
|
get_field_datatype(249) -> 'TINYBLOB';
|
||||||
|
@ -638,19 +873,41 @@ get_field_datatype(253) -> 'VAR_STRING';
|
||||||
get_field_datatype(254) -> 'STRING';
|
get_field_datatype(254) -> 'STRING';
|
||||||
get_field_datatype(255) -> 'GEOMETRY'.
|
get_field_datatype(255) -> 'GEOMETRY'.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
convert_type(Val, ColType) ->
|
||||||
%% Function: get_option(Key1, Options, Default) -> Value1
|
case ColType of
|
||||||
%% Options = [Option]
|
T when T == 'TINY';
|
||||||
%% Option = {Key2, Value2}
|
T == 'SHORT';
|
||||||
%% Key1 = Key2 = atom()
|
T == 'LONG';
|
||||||
%% Value1 = Value2 = Default = term()
|
T == 'LONGLONG';
|
||||||
%% Descrip.: Return the option associated with Key passed to squery/4
|
T == 'INT24';
|
||||||
%%--------------------------------------------------------------------
|
T == 'YEAR' ->
|
||||||
|
list_to_integer(binary_to_list(Val));
|
||||||
get_option(Key, Options, Default) ->
|
T when T == 'TIMESTAMP';
|
||||||
case lists:keysearch(Key, 1, Options) of
|
T == 'DATETIME' ->
|
||||||
{value, {_, Value}} ->
|
{ok, [Year, Month, Day, Hour, Minute, Second], _Leftovers} =
|
||||||
Value;
|
io_lib:fread("~d-~d-~d ~d:~d:~d", binary_to_list(Val)),
|
||||||
false ->
|
{datetime, {{Year, Month, Day}, {Hour, Minute, Second}}};
|
||||||
Default
|
'TIME' ->
|
||||||
|
{ok, [Hour, Minute, Second], _Leftovers} =
|
||||||
|
io_lib:fread("~d:~d:~d", binary_to_list(Val)),
|
||||||
|
{time, {Hour, Minute, Second}};
|
||||||
|
'DATE' ->
|
||||||
|
{ok, [Year, Month, Day], _Leftovers} =
|
||||||
|
io_lib:fread("~d-~d-~d", binary_to_list(Val)),
|
||||||
|
{date, {Year, Month, Day}};
|
||||||
|
T when T == 'DECIMAL';
|
||||||
|
T == 'NEWDECIMAL';
|
||||||
|
T == 'FLOAT';
|
||||||
|
T == 'DOUBLE' ->
|
||||||
|
{ok, [Num], _Leftovers} =
|
||||||
|
case io_lib:fread("~f", binary_to_list(Val)) of
|
||||||
|
{error, _} ->
|
||||||
|
io_lib:fread("~d", binary_to_list(Val));
|
||||||
|
Res ->
|
||||||
|
Res
|
||||||
|
end,
|
||||||
|
Num;
|
||||||
|
_Other ->
|
||||||
|
Val
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -105,8 +105,11 @@ init(Host, Port, LogFun, Parent) ->
|
||||||
},
|
},
|
||||||
loop(State);
|
loop(State);
|
||||||
E ->
|
E ->
|
||||||
mysql:log(LogFun, error, "mysql_recv: Failed connecting to ~p:~p : ~p",
|
LogFun(?MODULE, ?LINE, error,
|
||||||
[Host, Port, E]),
|
fun() ->
|
||||||
|
{"mysql_recv: Failed connecting to ~p:~p : ~p",
|
||||||
|
[Host, Port, E]}
|
||||||
|
end),
|
||||||
Msg = lists:flatten(io_lib:format("connect failed : ~p", [E])),
|
Msg = lists:flatten(io_lib:format("connect failed : ~p", [E])),
|
||||||
Parent ! {mysql_recv, self(), init, {error, Msg}}
|
Parent ! {mysql_recv, self(), init, {error, Msg}}
|
||||||
end.
|
end.
|
||||||
|
@ -127,11 +130,20 @@ loop(State) ->
|
||||||
Rest = sendpacket(State#state.parent, NewData),
|
Rest = sendpacket(State#state.parent, NewData),
|
||||||
loop(State#state{data = Rest});
|
loop(State#state{data = Rest});
|
||||||
{tcp_error, Sock, Reason} ->
|
{tcp_error, Sock, Reason} ->
|
||||||
mysql:log(State#state.log_fun, error, "mysql_recv: Socket ~p closed : ~p", [Sock, Reason]),
|
LogFun = State#state.log_fun,
|
||||||
|
LogFun(?MODULE, ?LINE, error,
|
||||||
|
fun() ->
|
||||||
|
{"mysql_recv: Socket ~p closed : ~p",
|
||||||
|
[Sock, Reason]}
|
||||||
|
end),
|
||||||
State#state.parent ! {mysql_recv, self(), closed, {error, Reason}},
|
State#state.parent ! {mysql_recv, self(), closed, {error, Reason}},
|
||||||
error;
|
error;
|
||||||
{tcp_closed, Sock} ->
|
{tcp_closed, Sock} ->
|
||||||
mysql:log(State#state.log_fun, debug, "mysql_recv: Socket ~p closed", [Sock]),
|
LogFun = State#state.log_fun,
|
||||||
|
LogFun(?MODULE, ?LINE, debug,
|
||||||
|
fun() ->
|
||||||
|
{"mysql_recv: Socket ~p closed", [Sock]}
|
||||||
|
end),
|
||||||
State#state.parent ! {mysql_recv, self(), closed, normal},
|
State#state.parent ! {mysql_recv, self(), closed, normal},
|
||||||
error
|
error
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -14,7 +14,7 @@ start_link() ->
|
||||||
supervisor:start_link(mysql_sup, []).
|
supervisor:start_link(mysql_sup, []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
MysqlConfig = [pool, "localhost", "flukso", "your_mysql_password_here", "flukso"],
|
MysqlConfig = [pool, "localhost", "flukso", "xpsCcVsbecJMVCYF", "flukso"],
|
||||||
Mysql = {mysql,
|
Mysql = {mysql,
|
||||||
{mysql, start_link, MysqlConfig},
|
{mysql, start_link, MysqlConfig},
|
||||||
permanent, 3000, worker, [mysql]},
|
permanent, 3000, worker, [mysql]},
|
||||||
|
|
Loading…
Reference in New Issue