861 lines
26 KiB
Erlang
861 lines
26 KiB
Erlang
%%% File : mysql.erl
|
|
%%% Author : Magnus Ahltorp <ahltorp@nada.kth.se>
|
|
%%% Descrip.: MySQL client.
|
|
%%%
|
|
%%% Created : 4 Aug 2005 by Magnus Ahltorp <ahltorp@nada.kth.se>
|
|
%%%
|
|
%%% Copyright (c) 2001-2004 Kungliga Tekniska Högskolan
|
|
%%% See the file COPYING
|
|
%%%
|
|
%%% Modified: 9/12/2006 by Yariv Sadan <yarivvv@gmail.com>
|
|
%%% Note: Added support for prepared statements,
|
|
%%% transactions, better connection pooling, more efficient logging
|
|
%%% and made other internal enhancements.
|
|
%%%
|
|
%%% Modified: 9/23/2006 Rewrote the transaction handling code to
|
|
%%% provide a simpler, Mnesia-style transaction interface. Also,
|
|
%%% moved much of the prepared statement handling code to mysql_conn.erl
|
|
%%% and added versioning to prepared statements.
|
|
%%%
|
|
%%%
|
|
%%% Usage:
|
|
%%%
|
|
%%%
|
|
%%% Call one of the start-functions before any call to fetch/2
|
|
%%%
|
|
%%% start_link(PoolId, Host, User, Password, Database)
|
|
%%% start_link(PoolId, Host, Port, User, Password, Database)
|
|
%%% start_link(PoolId, Host, User, Password, Database, LogFun)
|
|
%%% start_link(PoolId, Host, Port, User, Password, Database, LogFun)
|
|
%%%
|
|
%%% (These functions also have non-linking coutnerparts.)
|
|
%%%
|
|
%%% PoolId is a connection pool identifier. If you want to have more
|
|
%%% than one connection to a server (or a set of MySQL replicas),
|
|
%%% add more with
|
|
%%%
|
|
%%% connect(PoolId, Host, Port, User, Password, Database, Reconnect)
|
|
%%%
|
|
%%% use 'undefined' as Port to get default MySQL port number (3306).
|
|
%%% MySQL querys will be sent in a per-PoolId round-robin fashion.
|
|
%%% Set Reconnect to 'true' if you want the dispatcher to try and
|
|
%%% open a new connection, should this one die.
|
|
%%%
|
|
%%% When you have a mysql_dispatcher running, this is how you make a
|
|
%%% query :
|
|
%%%
|
|
%%% fetch(PoolId, "select * from hello") -> Result
|
|
%%% Result = {data, MySQLRes} | {updated, MySQLRes} |
|
|
%%% {error, MySQLRes}
|
|
%%%
|
|
%%% Actual data can be extracted from MySQLRes by calling the following API
|
|
%%% functions:
|
|
%%% - on data received:
|
|
%%% FieldInfo = mysql:get_result_field_info(MysqlRes)
|
|
%%% AllRows = mysql:get_result_rows(MysqlRes)
|
|
%%% with FieldInfo = list() of {Table, Field, Length, Name}
|
|
%%% and AllRows = list() of list() representing records
|
|
%%% - on update:
|
|
%%% Affected = mysql:get_result_affected_rows(MysqlRes)
|
|
%%% with Affected = integer()
|
|
%%% - on error:
|
|
%%% Reason = mysql:get_result_reason(MysqlRes)
|
|
%%% with Reason = string()
|
|
%%%
|
|
%%% If you just want a single MySQL connection, or want to manage your
|
|
%%% connections yourself, you can use the mysql_conn module as a
|
|
%%% stand-alone single MySQL connection. See the comment at the top of
|
|
%%% mysql_conn.erl.
|
|
|
|
-module(mysql).
|
|
-behaviour(gen_server).
|
|
|
|
|
|
%% @type mysql_result() = term()
|
|
%% @type query_result = {data, mysql_result()} | {updated, mysql_result()} |
|
|
%% {error, mysql_result()}
|
|
|
|
|
|
%% External exports
|
|
-export([start_link/5,
|
|
start_link/6,
|
|
start_link/7,
|
|
start_link/8,
|
|
|
|
start/5,
|
|
start/6,
|
|
start/7,
|
|
start/8,
|
|
|
|
connect/7,
|
|
connect/8,
|
|
connect/9,
|
|
|
|
fetch/1,
|
|
fetch/2,
|
|
fetch/3,
|
|
|
|
prepare/2,
|
|
execute/1,
|
|
execute/2,
|
|
execute/3,
|
|
execute/4,
|
|
unprepare/1,
|
|
get_prepared/1,
|
|
get_prepared/2,
|
|
|
|
transaction/2,
|
|
transaction/3,
|
|
|
|
get_result_field_info/1,
|
|
get_result_rows/1,
|
|
get_result_affected_rows/1,
|
|
get_result_reason/1,
|
|
|
|
encode/1,
|
|
encode/2,
|
|
asciz_binary/2
|
|
]).
|
|
|
|
%% Internal exports - just for mysql_* modules
|
|
-export([log/4
|
|
]).
|
|
|
|
%% Internal exports - gen_server callbacks
|
|
-export([init/1,
|
|
handle_call/3,
|
|
handle_cast/2,
|
|
handle_info/2,
|
|
terminate/2,
|
|
code_change/3
|
|
]).
|
|
|
|
%% Records
|
|
-include("mysql.hrl").
|
|
|
|
-record(conn, {
|
|
pool_id, %% atom(), the pool's id
|
|
pid, %% pid(), mysql_conn process
|
|
reconnect, %% true | false, should mysql_dispatcher try
|
|
%% to reconnect if this connection dies?
|
|
host, %% string()
|
|
port, %% integer()
|
|
user, %% string()
|
|
password, %% string()
|
|
database, %% string()
|
|
encoding
|
|
}).
|
|
|
|
-record(state, {
|
|
%% gb_tree mapping connection
|
|
%% pool id to a connection pool tuple
|
|
conn_pools = gb_trees:empty(),
|
|
|
|
|
|
%% gb_tree mapping connection Pid
|
|
%% to pool id
|
|
pids_pools = gb_trees:empty(),
|
|
|
|
%% function for logging,
|
|
log_fun,
|
|
|
|
|
|
%% maps names to {Statement::binary(), Version::integer()} values
|
|
prepares = gb_trees:empty()
|
|
}).
|
|
|
|
%% Macros
|
|
-define(SERVER, mysql_dispatcher).
|
|
-define(STATE_VAR, mysql_connection_state).
|
|
-define(CONNECT_TIMEOUT, 5000).
|
|
-define(LOCAL_FILES, 128).
|
|
-define(PORT, 3306).
|
|
|
|
%% used for debugging
|
|
-define(L(Msg), io:format("~p:~b ~p ~n", [?MODULE, ?LINE, Msg])).
|
|
|
|
%% Log messages are designed to instantiated lazily only if the logging level
|
|
%% permits a log message to be logged
|
|
-define(Log(LogFun,Level,Msg),
|
|
LogFun(?MODULE,?LINE,Level,fun()-> {Msg,[]} end)).
|
|
-define(Log2(LogFun,Level,Msg,Params),
|
|
LogFun(?MODULE,?LINE,Level,fun()-> {Msg,Params} end)).
|
|
|
|
|
|
log(Module, Line, _Level, FormatFun) ->
|
|
{Format, Arguments} = FormatFun(),
|
|
io:format("~w:~b: "++ Format ++ "~n", [Module, Line] ++ Arguments).
|
|
|
|
|
|
%% External functions
|
|
|
|
%% @doc Starts the MySQL client gen_server process.
|
|
%%
|
|
%% The Port and LogFun parameters are optional.
|
|
%%
|
|
%% @spec start_link(PoolId::atom(), Host::string(), Port::integer(),
|
|
%% Username::string(), Password::string(), Database::string(),
|
|
%% LogFun::undefined | function() of arity 4) ->
|
|
%% {ok, Pid} | ignore | {error, Err}
|
|
start_link(PoolId, Host, User, Password, Database) ->
|
|
start_link(PoolId, Host, ?PORT, User, Password, Database).
|
|
|
|
start_link(PoolId, Host, Port, User, Password, Database) ->
|
|
start_link(PoolId, Host, Port, User, Password, Database, undefined,
|
|
undefined).
|
|
|
|
start_link(PoolId, Host, undefined, User, Password, Database, LogFun) ->
|
|
start_link(PoolId, Host, ?PORT, User, Password, Database, LogFun,
|
|
undefined);
|
|
start_link(PoolId, Host, Port, User, Password, Database, LogFun) ->
|
|
start_link(PoolId, Host, Port, User, Password, Database, LogFun,
|
|
undefined).
|
|
|
|
start_link(PoolId, Host, undefined, User, Password, Database, LogFun,
|
|
Encoding) ->
|
|
start1(PoolId, Host, ?PORT, User, Password, Database, LogFun, Encoding,
|
|
start_link);
|
|
start_link(PoolId, Host, Port, User, Password, Database, LogFun, Encoding) ->
|
|
start1(PoolId, Host, Port, User, Password, Database, LogFun, Encoding,
|
|
start_link).
|
|
|
|
%% @doc These functions are similar to their start_link counterparts,
|
|
%% but they call gen_server:start() instead of gen_server:start_link()
|
|
start(PoolId, Host, User, Password, Database) ->
|
|
start(PoolId, Host, ?PORT, User, Password, Database).
|
|
|
|
start(PoolId, Host, Port, User, Password, Database) ->
|
|
start(PoolId, Host, Port, User, Password, Database, undefined).
|
|
|
|
start(PoolId, Host, undefined, User, Password, Database, LogFun) ->
|
|
start(PoolId, Host, ?PORT, User, Password, Database, LogFun);
|
|
start(PoolId, Host, Port, User, Password, Database, LogFun) ->
|
|
start(PoolId, Host, Port, User, Password, Database, LogFun, undefined).
|
|
|
|
start(PoolId, Host, undefined, User, Password, Database, LogFun, Encoding) ->
|
|
start1(PoolId, Host, ?PORT, User, Password, Database, LogFun, Encoding,
|
|
start);
|
|
start(PoolId, Host, Port, User, Password, Database, LogFun, Encoding) ->
|
|
start1(PoolId, Host, Port, User, Password, Database, LogFun, Encoding,
|
|
start).
|
|
|
|
start1(PoolId, Host, Port, User, Password, Database, LogFun, Encoding,
|
|
StartFunc) ->
|
|
crypto:start(),
|
|
gen_server:StartFunc(
|
|
{local, ?SERVER}, ?MODULE,
|
|
[PoolId, Host, Port, User, Password, Database, LogFun, Encoding], []).
|
|
|
|
|
|
%% @equiv connect(PoolId, Host, Port, User, Password, Database, Encoding,
|
|
%% Reconnect, true)
|
|
connect(PoolId, Host, Port, User, Password, Database, Encoding, Reconnect) ->
|
|
connect(PoolId, Host, Port, User, Password, Database, Encoding,
|
|
Reconnect, true).
|
|
|
|
%% @doc Starts a MySQL connection and, if successful, add it to the
|
|
%% connection pool in the dispatcher.
|
|
%%
|
|
%% @spec: connect(PoolId::atom(), Host::string(), Port::integer() | undefined,
|
|
%% User::string(), Password::string(), Database::string(),
|
|
%% Encoding::string(), Reconnect::bool(), LinkConnection::bool()) ->
|
|
%% {ok, ConnPid} | {error, Reason}
|
|
connect(PoolId, Host, Port, User, Password, Database, Encoding, Reconnect,
|
|
LinkConnection) ->
|
|
Port1 = if Port == undefined -> ?PORT; true -> Port end,
|
|
Fun = if LinkConnection ->
|
|
fun mysql_conn:start_link/8;
|
|
true ->
|
|
fun mysql_conn:start/8
|
|
end,
|
|
|
|
{ok, LogFun} = gen_server:call(?SERVER, get_logfun),
|
|
case Fun(Host, Port1, User, Password, Database, LogFun,
|
|
Encoding, PoolId) of
|
|
{ok, ConnPid} ->
|
|
Conn = new_conn(PoolId, ConnPid, Reconnect, Host, Port1, User,
|
|
Password, Database, Encoding),
|
|
case gen_server:call(
|
|
?SERVER, {add_conn, Conn}) of
|
|
ok ->
|
|
{ok, ConnPid};
|
|
Res ->
|
|
Res
|
|
end;
|
|
Err->
|
|
Err
|
|
end.
|
|
|
|
new_conn(PoolId, ConnPid, Reconnect, Host, Port, User, Password, Database,
|
|
Encoding) ->
|
|
case Reconnect of
|
|
true ->
|
|
#conn{pool_id = PoolId,
|
|
pid = ConnPid,
|
|
reconnect = true,
|
|
host = Host,
|
|
port = Port,
|
|
user = User,
|
|
password = Password,
|
|
database = Database,
|
|
encoding = Encoding
|
|
};
|
|
false ->
|
|
#conn{pool_id = PoolId,
|
|
pid = ConnPid,
|
|
reconnect = false}
|
|
end.
|
|
|
|
%% @doc Fetch a query inside a transaction.
|
|
%%
|
|
%% @spec fetch(Query::iolist()) -> query_result()
|
|
fetch(Query) ->
|
|
case get(?STATE_VAR) of
|
|
undefined ->
|
|
{error, not_in_transaction};
|
|
State ->
|
|
mysql_conn:fetch_local(State, Query)
|
|
end.
|
|
|
|
%% @doc Send a query to a connection from the connection pool and wait
|
|
%% for the result. If this function is called inside a transaction,
|
|
%% the PoolId parameter is ignored.
|
|
%%
|
|
%% @spec fetch(PoolId::atom(), Query::iolist(), Timeout::integer()) ->
|
|
%% query_result()
|
|
fetch(PoolId, Query) ->
|
|
fetch(PoolId, Query, undefined).
|
|
|
|
fetch(PoolId, Query, Timeout) ->
|
|
case get(?STATE_VAR) of
|
|
undefined ->
|
|
call_server({fetch, PoolId, Query}, Timeout);
|
|
State ->
|
|
mysql_conn:fetch_local(State, Query)
|
|
end.
|
|
|
|
|
|
%% @doc Register a prepared statement with the dispatcher. This call does not
|
|
%% prepare the statement in any connections. The statement is prepared
|
|
%% lazily in each connection when it is told to execute the statement.
|
|
%% If the Name parameter matches the name of a statement that has
|
|
%% already been registered, the version of the statement is incremented
|
|
%% and all connections that have already prepared the statement will
|
|
%% prepare it again with the newest version.
|
|
%%
|
|
%% @spec prepare(Name::atom(), Query::iolist()) -> ok
|
|
prepare(Name, Query) ->
|
|
gen_server:cast(?SERVER, {prepare, Name, Query}).
|
|
|
|
%% @doc Unregister a statement that has previously been register with
|
|
%% the dispatcher. All calls to execute() with the given statement
|
|
%% will fail once the statement is unprepared. If the statement hasn't
|
|
%% been prepared, nothing happens.
|
|
%%
|
|
%% @spec unprepare(Name::atom()) -> ok
|
|
unprepare(Name) ->
|
|
gen_server:cast(?SERVER, {unprepare, Name}).
|
|
|
|
%% @doc Get the prepared statement with the given name.
|
|
%%
|
|
%% This function is called from mysql_conn when the connection is
|
|
%% told to execute a prepared statement it has not yet prepared, or
|
|
%% when it is told to execute a statement inside a transaction and
|
|
%% it's not sure that it has the latest version of the statement.
|
|
%%
|
|
%% If the latest version of the prepared statement matches the Version
|
|
%% parameter, the return value is {ok, latest}. This saves the cost
|
|
%% of sending the query when the connection already has the latest version.
|
|
%%
|
|
%% @spec get_prepared(Name::atom(), Version::integer()) ->
|
|
%% {ok, latest} | {ok, Statement::binary()} | {error, Err}
|
|
get_prepared(Name) ->
|
|
get_prepared(Name, undefined).
|
|
get_prepared(Name, Version) ->
|
|
gen_server:call(?SERVER, {get_prepared, Name, Version}).
|
|
|
|
|
|
%% @doc Execute a query inside a transaction.
|
|
%%
|
|
%% @spec execute(Name::atom, Params::[term()]) -> mysql_result()
|
|
execute(Name) ->
|
|
execute(Name, []).
|
|
|
|
execute(Name, Params) when is_atom(Name), is_list(Params) ->
|
|
case get(?STATE_VAR) of
|
|
undefined ->
|
|
{error, not_in_transaction};
|
|
State ->
|
|
mysql_conn:execute_local(State, Name, Params)
|
|
end;
|
|
|
|
%% @doc Execute a query in the connection pool identified by
|
|
%% PoolId. This function optionally accepts a list of parameters to pass
|
|
%% to the prepared statement and a Timeout parameter.
|
|
%% If this function is called inside a transaction, the PoolId paramter is
|
|
%% ignored.
|
|
%%
|
|
%% @spec execute(PoolId::atom(), Name::atom(), Params::[term()],
|
|
%% Timeout::integer()) -> mysql_result()
|
|
execute(PoolId, Name) when is_atom(PoolId), is_atom(Name) ->
|
|
execute(PoolId, Name, []).
|
|
|
|
execute(PoolId, Name, Timeout) when is_integer(Timeout) ->
|
|
execute(PoolId, Name, [], Timeout);
|
|
|
|
execute(PoolId, Name, Params) when is_list(Params) ->
|
|
execute(PoolId, Name, Params, undefined).
|
|
|
|
execute(PoolId, Name, Params, Timeout) ->
|
|
case get(?STATE_VAR) of
|
|
undefined ->
|
|
call_server({execute, PoolId, Name, Params}, Timeout);
|
|
State ->
|
|
case mysql_conn:execute_local(State, Name, Params) of
|
|
{ok, Res, NewState} ->
|
|
put(?STATE_VAR, NewState),
|
|
Res;
|
|
Err ->
|
|
Err
|
|
end
|
|
end.
|
|
|
|
%% @doc Execute a transaction in a connection belonging to the connection pool.
|
|
%% Fun is a function containing a sequence of calls to fetch() and/or
|
|
%% execute().
|
|
%% If an error occurs, or if the function does any of the following:
|
|
%%
|
|
%% - throw(error)
|
|
%% - throw({error, Err})
|
|
%% - return error
|
|
%% - return {error, Err}
|
|
%% - exit(Reason)
|
|
%%
|
|
%% the transaction is automatically rolled back.
|
|
%%
|
|
%% @spec transaction(PoolId::atom(), Fun::function()) ->
|
|
%% {atomic, Result} | {aborted, {Reason, {rollback_result, Result}}}
|
|
transaction(PoolId, Fun) ->
|
|
transaction(PoolId, Fun, undefined).
|
|
|
|
transaction(PoolId, Fun, Timeout) ->
|
|
case get(?STATE_VAR) of
|
|
undefined ->
|
|
call_server({transaction, PoolId, Fun}, Timeout);
|
|
State ->
|
|
case mysql_conn:get_pool_id(State) of
|
|
PoolId ->
|
|
case catch Fun() of
|
|
error = Err -> throw(Err);
|
|
{error, _} = Err -> throw(Err);
|
|
{'EXIT', _} = Err -> throw(Err);
|
|
Other -> {atomic, Other}
|
|
end;
|
|
_Other ->
|
|
call_server({transaction, PoolId, Fun}, Timeout)
|
|
end
|
|
end.
|
|
|
|
%% @doc Extract the FieldInfo from MySQL Result on data received.
|
|
%%
|
|
%% @spec get_result_field_info(MySQLRes::mysql_result()) ->
|
|
%% [{Table, Field, Length, Name}]
|
|
get_result_field_info(#mysql_result{fieldinfo = FieldInfo}) ->
|
|
FieldInfo.
|
|
|
|
%% @doc Extract the Rows from MySQL Result on data received
|
|
%%
|
|
%% @spec get_result_rows(MySQLRes::mysql_result()) -> [Row::list()]
|
|
get_result_rows(#mysql_result{rows=AllRows}) ->
|
|
AllRows.
|
|
|
|
%% @doc Extract the Rows from MySQL Result on update
|
|
%%
|
|
%% @spec get_result_affected_rows(MySQLRes::mysql_result()) ->
|
|
%% AffectedRows::integer()
|
|
get_result_affected_rows(#mysql_result{affectedrows=AffectedRows}) ->
|
|
AffectedRows.
|
|
|
|
%% @doc Extract the error Reason from MySQL Result on error
|
|
%%
|
|
%% @spec get_result_reason(MySQLRes::mysql_result()) ->
|
|
%% Reason::string()
|
|
get_result_reason(#mysql_result{error=Reason}) ->
|
|
Reason.
|
|
|
|
connect(PoolId, Host, undefined, User, Password, Database, Reconnect) ->
|
|
connect(PoolId, Host, ?PORT, User, Password, Database, undefined,
|
|
Reconnect).
|
|
|
|
%% gen_server callbacks
|
|
|
|
init([PoolId, Host, Port, User, Password, Database, LogFun, Encoding]) ->
|
|
LogFun1 = if LogFun == undefined -> fun log/4; true -> LogFun end,
|
|
case mysql_conn:start(Host, Port, User, Password, Database, LogFun1,
|
|
Encoding, PoolId) of
|
|
{ok, ConnPid} ->
|
|
Conn = new_conn(PoolId, ConnPid, true, Host, Port, User, Password,
|
|
Database, Encoding),
|
|
State = #state{log_fun = LogFun1},
|
|
{ok, add_conn(Conn, State)};
|
|
{error, Reason} ->
|
|
?Log(LogFun1, error,
|
|
"failed starting first MySQL connection handler, "
|
|
"exiting"),
|
|
{stop, {error, Reason}}
|
|
end.
|
|
|
|
handle_call({fetch, PoolId, Query}, From, State) ->
|
|
fetch_queries(PoolId, From, State, [Query]);
|
|
|
|
handle_call({get_prepared, Name, Version}, _From, State) ->
|
|
case gb_trees:lookup(Name, State#state.prepares) of
|
|
none ->
|
|
{reply, {error, {undefined, Name}}, State};
|
|
{value, {_StmtBin, Version1}} when Version1 == Version ->
|
|
{reply, {ok, latest}, State};
|
|
{value, Stmt} ->
|
|
{reply, {ok, Stmt}, State}
|
|
end;
|
|
|
|
handle_call({execute, PoolId, Name, Params}, From, State) ->
|
|
with_next_conn(
|
|
PoolId, State,
|
|
fun(Conn, State1) ->
|
|
case gb_trees:lookup(Name, State1#state.prepares) of
|
|
none ->
|
|
{reply, {error, {no_such_statement, Name}}, State1};
|
|
{value, {_Stmt, Version}} ->
|
|
mysql_conn:execute(Conn#conn.pid, Name,
|
|
Version, Params, From),
|
|
{noreply, State1}
|
|
end
|
|
end);
|
|
|
|
handle_call({transaction, PoolId, Fun}, From, State) ->
|
|
with_next_conn(
|
|
PoolId, State,
|
|
fun(Conn, State1) ->
|
|
mysql_conn:transaction(Conn#conn.pid, Fun, From),
|
|
{noreply, State1}
|
|
end);
|
|
|
|
handle_call({add_conn, Conn}, _From, State) ->
|
|
NewState = add_conn(Conn, State),
|
|
{PoolId, ConnPid} = {Conn#conn.pool_id, Conn#conn.pid},
|
|
LogFun = State#state.log_fun,
|
|
?Log2(LogFun, normal,
|
|
"added connection with id '~p' (pid ~p) to my list",
|
|
[PoolId, ConnPid]),
|
|
{reply, ok, NewState};
|
|
|
|
handle_call(get_logfun, _From, State) ->
|
|
{reply, {ok, State#state.log_fun}, State}.
|
|
|
|
handle_cast({prepare, Name, Stmt}, State) ->
|
|
LogFun = State#state.log_fun,
|
|
Version1 =
|
|
case gb_trees:lookup(Name, State#state.prepares) of
|
|
{value, {_Stmt, Version}} ->
|
|
Version + 1;
|
|
none ->
|
|
1
|
|
end,
|
|
?Log2(LogFun, debug,
|
|
"received prepare/2: ~p (ver ~p) ~p", [Name, Version1, Stmt]),
|
|
{noreply, State#state{prepares =
|
|
gb_trees:enter(Name, {Stmt, Version1},
|
|
State#state.prepares)}};
|
|
|
|
handle_cast({unprepare, Name}, State) ->
|
|
LogFun = State#state.log_fun,
|
|
?Log2(LogFun, debug, "received unprepare/1: ~p", [Name]),
|
|
State1 =
|
|
case gb_trees:lookup(Name, State#state.prepares) of
|
|
none ->
|
|
?Log2(LogFun, warn, "trying to unprepare a non-existing "
|
|
"statement: ~p", [Name]),
|
|
State;
|
|
{value, _Stmt} ->
|
|
State#state{prepares =
|
|
gb_trees:delete(Name, State#state.prepares)}
|
|
end,
|
|
{noreply, State1}.
|
|
|
|
%% Called when a connection to the database has been lost. If
|
|
%% The 'reconnect' flag was set to true for the connection, we attempt
|
|
%% to establish a new connection to the database.
|
|
handle_info({'DOWN', _MonitorRef, process, Pid, Info}, State) ->
|
|
LogFun = State#state.log_fun,
|
|
case remove_conn(Pid, State) of
|
|
{ok, Conn, NewState} ->
|
|
LogLevel = case Info of
|
|
normal -> normal;
|
|
_ -> error
|
|
end,
|
|
?Log2(LogFun, LogLevel,
|
|
"connection pid ~p exited : ~p", [Pid, Info]),
|
|
case Conn#conn.reconnect of
|
|
true ->
|
|
start_reconnect(Conn, LogFun);
|
|
false ->
|
|
ok
|
|
end,
|
|
{noreply, NewState};
|
|
error ->
|
|
?Log2(LogFun, error,
|
|
"received 'DOWN' signal from pid ~p not in my list", [Pid]),
|
|
{noreply, State}
|
|
end.
|
|
|
|
terminate(Reason, State) ->
|
|
LogFun = State#state.log_fun,
|
|
LogLevel = case Reason of
|
|
normal -> debug;
|
|
_ -> error
|
|
end,
|
|
?Log2(LogFun, LogLevel, "terminating with reason: ~p", [Reason]),
|
|
Reason.
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
|
|
|
|
%% Internal functions
|
|
|
|
fetch_queries(PoolId, From, State, QueryList) ->
|
|
with_next_conn(
|
|
PoolId, State,
|
|
fun(Conn, State1) ->
|
|
Pid = Conn#conn.pid,
|
|
mysql_conn:fetch(Pid, QueryList, From),
|
|
{noreply, State1}
|
|
end).
|
|
|
|
with_next_conn(PoolId, State, Fun) ->
|
|
case get_next_conn(PoolId, State) of
|
|
{ok, Conn, NewState} ->
|
|
Fun(Conn, NewState);
|
|
error ->
|
|
%% we have no active connection matching PoolId
|
|
{reply, {error, {no_connection_in_pool, PoolId}}, State}
|
|
end.
|
|
|
|
call_server(Msg, Timeout) ->
|
|
if Timeout == undefined ->
|
|
gen_server:call(?SERVER, Msg);
|
|
true ->
|
|
gen_server:call(?SERVER, Msg, Timeout)
|
|
end.
|
|
|
|
add_conn(Conn, State) ->
|
|
Pid = Conn#conn.pid,
|
|
erlang:monitor(process, Conn#conn.pid),
|
|
PoolId = Conn#conn.pool_id,
|
|
ConnPools = State#state.conn_pools,
|
|
NewPool =
|
|
case gb_trees:lookup(PoolId, ConnPools) of
|
|
none ->
|
|
{[Conn],[]};
|
|
{value, {Unused, Used}} ->
|
|
{[Conn | Unused], Used}
|
|
end,
|
|
State#state{conn_pools =
|
|
gb_trees:enter(PoolId, NewPool,
|
|
ConnPools),
|
|
pids_pools = gb_trees:enter(Pid, PoolId,
|
|
State#state.pids_pools)}.
|
|
|
|
remove_pid_from_list(Pid, Conns) ->
|
|
lists:foldl(
|
|
fun(OtherConn, {NewConns, undefined}) ->
|
|
if OtherConn#conn.pid == Pid ->
|
|
{NewConns, OtherConn};
|
|
true ->
|
|
{[OtherConn | NewConns], undefined}
|
|
end;
|
|
(OtherConn, {NewConns, FoundConn}) ->
|
|
{[OtherConn|NewConns], FoundConn}
|
|
end, {[],undefined}, lists:reverse(Conns)).
|
|
|
|
remove_pid_from_lists(Pid, Conns1, Conns2) ->
|
|
case remove_pid_from_list(Pid, Conns1) of
|
|
{NewConns1, undefined} ->
|
|
{NewConns2, Conn} = remove_pid_from_list(Pid, Conns2),
|
|
{Conn, {NewConns1, NewConns2}};
|
|
{NewConns1, Conn} ->
|
|
{Conn, {NewConns1, Conns2}}
|
|
end.
|
|
|
|
remove_conn(Pid, State) ->
|
|
PidsPools = State#state.pids_pools,
|
|
case gb_trees:lookup(Pid, PidsPools) of
|
|
none ->
|
|
error;
|
|
{value, PoolId} ->
|
|
ConnPools = State#state.conn_pools,
|
|
case gb_trees:lookup(PoolId, ConnPools) of
|
|
none ->
|
|
error;
|
|
{value, {Unused, Used}} ->
|
|
{Conn, NewPool} = remove_pid_from_lists(Pid, Unused, Used),
|
|
NewConnPools = gb_trees:enter(PoolId, NewPool, ConnPools),
|
|
{ok, Conn, State#state{conn_pools = NewConnPools,
|
|
pids_pools =
|
|
gb_trees:delete(Pid, PidsPools)}}
|
|
end
|
|
end.
|
|
|
|
get_next_conn(PoolId, State) ->
|
|
ConnPools = State#state.conn_pools,
|
|
case gb_trees:lookup(PoolId, ConnPools) of
|
|
none ->
|
|
error;
|
|
{value, {[],[]}} ->
|
|
error;
|
|
%% We maintain 2 lists: one for unused connections and one for used
|
|
%% connections. When we run out of unused connections, we recycle
|
|
%% the list of used connections.
|
|
{value, {[], Used}} ->
|
|
[Conn | Conns] = lists:reverse(Used),
|
|
{ok, Conn,
|
|
State#state{conn_pools =
|
|
gb_trees:enter(PoolId, {Conns, [Conn]}, ConnPools)}};
|
|
{value, {[Conn|Unused], Used}} ->
|
|
{ok, Conn, State#state{
|
|
conn_pools =
|
|
gb_trees:enter(PoolId, {Unused, [Conn|Used]},
|
|
ConnPools)}}
|
|
end.
|
|
|
|
start_reconnect(Conn, LogFun) ->
|
|
Pid = spawn(fun () ->
|
|
reconnect_loop(Conn#conn{pid = undefined}, LogFun, 0)
|
|
end),
|
|
{PoolId, Host, Port} = {Conn#conn.pool_id, Conn#conn.host, Conn#conn.port},
|
|
?Log2(LogFun, debug,
|
|
"started pid ~p to try and reconnect to ~p:~s:~p (replacing "
|
|
"connection with pid ~p)",
|
|
[Pid, PoolId, Host, Port, Conn#conn.pid]),
|
|
ok.
|
|
|
|
reconnect_loop(Conn, LogFun, N) ->
|
|
{PoolId, Host, Port} = {Conn#conn.pool_id, Conn#conn.host, Conn#conn.port},
|
|
case connect(PoolId,
|
|
Host,
|
|
Port,
|
|
Conn#conn.user,
|
|
Conn#conn.password,
|
|
Conn#conn.database,
|
|
Conn#conn.encoding,
|
|
Conn#conn.reconnect) of
|
|
{ok, ConnPid} ->
|
|
?Log2(LogFun, debug,
|
|
"managed to reconnect to ~p:~s:~p "
|
|
"(connection pid ~p)", [PoolId, Host, Port, ConnPid]),
|
|
ok;
|
|
{error, Reason} ->
|
|
%% log every once in a while
|
|
NewN = case N of
|
|
10 ->
|
|
?Log2(LogFun, debug,
|
|
"reconnect: still unable to connect to "
|
|
"~p:~s:~p (~p)", [PoolId, Host, Port, Reason]),
|
|
0;
|
|
_ ->
|
|
N + 1
|
|
end,
|
|
%% sleep between every unsuccessful attempt
|
|
timer:sleep(5 * 1000),
|
|
reconnect_loop(Conn, LogFun, NewN)
|
|
end.
|
|
|
|
|
|
%% @doc Encode a value so that it can be included safely in a MySQL query.
|
|
%%
|
|
%% @spec encode(Val::term(), AsBinary::bool()) ->
|
|
%% string() | binary() | {error, Error}
|
|
encode(Val) ->
|
|
encode(Val, false).
|
|
encode(Val, false) when Val == undefined; Val == null ->
|
|
"null";
|
|
encode(Val, true) when Val == undefined; Val == null ->
|
|
<<"null">>;
|
|
encode(Val, false) when is_binary(Val) ->
|
|
binary_to_list(quote(Val));
|
|
encode(Val, true) when is_binary(Val) ->
|
|
quote(Val);
|
|
encode(Val, true) ->
|
|
list_to_binary(encode(Val,false));
|
|
encode(Val, false) when is_atom(Val) ->
|
|
quote(atom_to_list(Val));
|
|
encode(Val, false) when is_list(Val) ->
|
|
quote(Val);
|
|
encode(Val, false) when is_integer(Val) ->
|
|
integer_to_list(Val);
|
|
encode(Val, false) when is_float(Val) ->
|
|
[Res] = io_lib:format("~w", [Val]),
|
|
Res;
|
|
encode({datetime, Val}, AsBinary) ->
|
|
encode(Val, AsBinary);
|
|
encode({{Year, Month, Day}, {Hour, Minute, Second}}, false) ->
|
|
Res = two_digits([Year, Month, Day, Hour, Minute, Second]),
|
|
lists:flatten(Res);
|
|
encode({TimeType, Val}, AsBinary)
|
|
when TimeType == 'date';
|
|
TimeType == 'time' ->
|
|
encode(Val, AsBinary);
|
|
encode({Time1, Time2, Time3}, false) ->
|
|
Res = two_digits([Time1, Time2, Time3]),
|
|
lists:flatten(Res);
|
|
encode(Val, _AsBinary) ->
|
|
{error, {unrecognized_value, Val}}.
|
|
|
|
two_digits(Nums) when is_list(Nums) ->
|
|
[two_digits(Num) || Num <- Nums];
|
|
two_digits(Num) ->
|
|
[Str] = io_lib:format("~b", [Num]),
|
|
case length(Str) of
|
|
1 -> [$0 | Str];
|
|
_ -> Str
|
|
end.
|
|
|
|
%% Quote a string or binary value so that it can be included safely in a
|
|
%% MySQL query.
|
|
quote(String) when is_list(String) ->
|
|
[39 | lists:reverse([39 | quote(String, [])])]; %% 39 is $'
|
|
quote(Bin) when is_binary(Bin) ->
|
|
list_to_binary(quote(binary_to_list(Bin))).
|
|
|
|
quote([], Acc) ->
|
|
Acc;
|
|
quote([0 | Rest], Acc) ->
|
|
quote(Rest, [$0, $\\ | Acc]);
|
|
quote([10 | Rest], Acc) ->
|
|
quote(Rest, [$n, $\\ | Acc]);
|
|
quote([13 | Rest], Acc) ->
|
|
quote(Rest, [$r, $\\ | Acc]);
|
|
quote([$\\ | Rest], Acc) ->
|
|
quote(Rest, [$\\ , $\\ | Acc]);
|
|
quote([39 | Rest], Acc) -> %% 39 is $'
|
|
quote(Rest, [39, $\\ | Acc]); %% 39 is $'
|
|
quote([34 | Rest], Acc) -> %% 34 is $"
|
|
quote(Rest, [34, $\\ | Acc]); %% 34 is $"
|
|
quote([26 | Rest], Acc) ->
|
|
quote(Rest, [$Z, $\\ | Acc]);
|
|
quote([C | Rest], Acc) ->
|
|
quote(Rest, [C | Acc]).
|
|
|
|
|
|
%% @doc Find the first zero-byte in Data and add everything before it
|
|
%% to Acc, as a string.
|
|
%%
|
|
%% @spec asciz_binary(Data::binary(), Acc::list()) ->
|
|
%% {NewList::list(), Rest::binary()}
|
|
asciz_binary(<<>>, Acc) ->
|
|
{lists:reverse(Acc), <<>>};
|
|
asciz_binary(<<0:8, Rest/binary>>, Acc) ->
|
|
{lists:reverse(Acc), Rest};
|
|
asciz_binary(<<C:8, Rest/binary>>, Acc) ->
|
|
asciz_binary(Rest, [C | Acc]).
|